From 648d22ee4046baea1aeed7443361c47b5ea94532 Mon Sep 17 00:00:00 2001 From: dragondriver Date: Thu, 19 Aug 2021 10:28:10 +0800 Subject: [PATCH] Expose metrics of IndexCoord and IndexNodes (#7157) Signed-off-by: dragondriver --- .../distributed/indexcoord/client/client.go | 7 + internal/distributed/indexcoord/service.go | 4 + .../distributed/indexnode/client/client.go | 7 + internal/distributed/indexnode/service.go | 4 + internal/indexcoord/errors.go | 29 +++ internal/indexcoord/errors_test.go | 44 ++++ internal/indexcoord/index_coord.go | 73 +++++++ internal/indexcoord/index_coord_test.go | 80 +++++++ internal/indexcoord/metrics_info.go | 119 +++++++++++ internal/indexcoord/metrics_info_test.go | 22 ++ internal/indexcoord/node_manager.go | 23 ++ internal/indexcoord/node_manager_test.go | 22 ++ internal/indexnode/errors.go | 25 +++ internal/indexnode/errors_test.go | 35 +++ internal/indexnode/indexnode.go | 74 +++++++ internal/indexnode/indexnode_test.go | 15 ++ internal/indexnode/metrics_info.go | 56 +++++ internal/indexnode/metrics_info_test.go | 22 ++ internal/proto/index_coord.proto | 6 + internal/proto/indexpb/index_coord.pb.go | 199 ++++++++++++------ internal/types/types.go | 1 + internal/util/metricsinfo/metric_type.go | 17 ++ internal/util/metricsinfo/metric_type_test.go | 23 +- internal/util/metricsinfo/metrics_info.go | 12 ++ .../util/metricsinfo/metrics_info_test.go | 32 +++ internal/util/metricsinfo/topology.go | 12 ++ internal/util/metricsinfo/topology_test.go | 132 +++++++++++- 27 files changed, 1031 insertions(+), 64 deletions(-) create mode 100644 internal/indexcoord/errors.go create mode 100644 internal/indexcoord/errors_test.go create mode 100644 internal/indexcoord/metrics_info.go create mode 100644 internal/indexcoord/metrics_info_test.go create mode 100644 internal/indexcoord/node_manager_test.go create mode 100644 internal/indexnode/errors.go create mode 100644 internal/indexnode/errors_test.go create mode 100644 internal/indexnode/metrics_info.go create mode 100644 internal/indexnode/metrics_info_test.go diff --git a/internal/distributed/indexcoord/client/client.go b/internal/distributed/indexcoord/client/client.go index 3e7c2f500d..5a7b23bf34 100644 --- a/internal/distributed/indexcoord/client/client.go +++ b/internal/distributed/indexcoord/client/client.go @@ -202,3 +202,10 @@ func (c *Client) GetIndexFilePaths(ctx context.Context, req *indexpb.GetIndexFil }) return ret.(*indexpb.GetIndexFilePathsResponse), err } + +func (c *Client) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { + ret, err := c.recall(func() (interface{}, error) { + return c.grpcClient.GetMetrics(ctx, req) + }) + return ret.(*milvuspb.GetMetricsResponse), err +} diff --git a/internal/distributed/indexcoord/service.go b/internal/distributed/indexcoord/service.go index bb84025cb2..7fe7a5b1ca 100644 --- a/internal/distributed/indexcoord/service.go +++ b/internal/distributed/indexcoord/service.go @@ -146,6 +146,10 @@ func (s *Server) GetIndexFilePaths(ctx context.Context, req *indexpb.GetIndexFil return s.indexcoord.GetIndexFilePaths(ctx, req) } +func (s *Server) GetMetrics(ctx context.Context, request *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { + return s.indexcoord.GetMetrics(ctx, request) +} + func (s *Server) startGrpcLoop(grpcPort int) { defer s.loopWg.Done() diff --git a/internal/distributed/indexnode/client/client.go b/internal/distributed/indexnode/client/client.go index 5eca4e1e02..5f63f0926e 100644 --- a/internal/distributed/indexnode/client/client.go +++ b/internal/distributed/indexnode/client/client.go @@ -160,3 +160,10 @@ func (c *Client) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques }) return ret.(*commonpb.Status), err } + +func (c *Client) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { + ret, err := c.recall(func() (interface{}, error) { + return c.grpcClient.GetMetrics(ctx, req) + }) + return ret.(*milvuspb.GetMetricsResponse), err +} diff --git a/internal/distributed/indexnode/service.go b/internal/distributed/indexnode/service.go index 811a020c84..dd0a4c5c4e 100644 --- a/internal/distributed/indexnode/service.go +++ b/internal/distributed/indexnode/service.go @@ -184,6 +184,10 @@ func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques return s.indexnode.CreateIndex(ctx, req) } +func (s *Server) GetMetrics(ctx context.Context, request *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { + return s.indexnode.GetMetrics(ctx, request) +} + func NewServer(ctx context.Context) (*Server, error) { ctx1, cancel := context.WithCancel(ctx) node, err := indexnode.NewIndexNode(ctx1) diff --git a/internal/indexcoord/errors.go b/internal/indexcoord/errors.go new file mode 100644 index 0000000000..890b1da4fc --- /dev/null +++ b/internal/indexcoord/errors.go @@ -0,0 +1,29 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package indexcoord + +import ( + "errors" + "fmt" +) + +func errIndexNodeIsNotOnService(id UniqueID) error { + return fmt.Errorf("index node %d is not on service", id) +} + +func msgIndexCoordIsUnhealthy(coordID UniqueID) string { + return fmt.Sprintf("IndexCoord %d is not ready", coordID) +} + +func errIndexCoordIsUnhealthy(coordID UniqueID) error { + return errors.New(msgIndexCoordIsUnhealthy(coordID)) +} diff --git a/internal/indexcoord/errors_test.go b/internal/indexcoord/errors_test.go new file mode 100644 index 0000000000..962cf4deae --- /dev/null +++ b/internal/indexcoord/errors_test.go @@ -0,0 +1,44 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package indexcoord + +import ( + "testing" + + "github.com/milvus-io/milvus/internal/log" + "go.uber.org/zap" +) + +func TestErrIndexNodeIsNotOnService(t *testing.T) { + indexNodeIDList := []UniqueID{ + 1, + } + + for _, id := range indexNodeIDList { + log.Info("TestErrIndexNodeIsNotOnService", + zap.Error(errIndexNodeIsNotOnService(id))) + } +} + +func TestMsgIndexCoordIsUnhealthy(t *testing.T) { + nodeIDList := []UniqueID{1, 2, 3} + for _, nodeID := range nodeIDList { + log.Info("TestMsgIndexCoordIsUnhealthy", zap.String("msg", msgIndexCoordIsUnhealthy(nodeID))) + } +} + +func TestErrIndexCoordIsUnhealthy(t *testing.T) { + nodeIDList := []UniqueID{1, 2, 3} + for _, nodeID := range nodeIDList { + log.Info("TestErrIndexCoordIsUnhealthy", zap.Error(errIndexCoordIsUnhealthy(nodeID))) + } +} diff --git a/internal/indexcoord/index_coord.go b/internal/indexcoord/index_coord.go index c367bc0d8b..f9bd06bc40 100644 --- a/internal/indexcoord/index_coord.go +++ b/internal/indexcoord/index_coord.go @@ -21,6 +21,8 @@ import ( "sync/atomic" "time" + "github.com/milvus-io/milvus/internal/util/metricsinfo" + "github.com/milvus-io/milvus/internal/util/trace" "github.com/coreos/etcd/mvcc/mvccpb" @@ -231,6 +233,11 @@ func (i *IndexCoord) UpdateStateCode(code internalpb.StateCode) { i.stateCode.Store(code) } +func (i *IndexCoord) isHealthy() bool { + code := i.stateCode.Load().(internalpb.StateCode) + return code == internalpb.StateCode_Healthy +} + func (i *IndexCoord) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) { log.Debug("get IndexCoord component states ...") stateInfo := &internalpb.ComponentInfo{ @@ -432,6 +439,72 @@ func (i *IndexCoord) GetIndexFilePaths(ctx context.Context, req *indexpb.GetInde return ret, nil } +func (i *IndexCoord) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { + log.Debug("IndexCoord.GetMetrics", + zap.Int64("node_id", i.ID), + zap.String("req", req.Request)) + + if !i.isHealthy() { + log.Warn("IndexCoord.GetMetrics failed", + zap.Int64("node_id", i.ID), + zap.String("req", req.Request), + zap.Error(errIndexCoordIsUnhealthy(i.ID))) + + return &milvuspb.GetMetricsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: msgIndexCoordIsUnhealthy(i.ID), + }, + Response: "", + }, nil + } + + metricType, err := metricsinfo.ParseMetricType(req.Request) + if err != nil { + log.Warn("IndexCoord.GetMetrics failed to parse metric type", + zap.Int64("node_id", i.ID), + zap.String("req", req.Request), + zap.Error(err)) + + return &milvuspb.GetMetricsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: err.Error(), + }, + Response: "", + }, nil + } + + log.Debug("IndexCoord.GetMetrics", + zap.String("metric_type", metricType)) + + if metricType == metricsinfo.SystemInfoMetrics { + metrics, err := getSystemInfoMetrics(ctx, req, i) + + log.Debug("IndexCoord.GetMetrics", + zap.Int64("node_id", i.ID), + zap.String("req", req.Request), + zap.String("metric_type", metricType), + zap.Any("metrics", metrics), // TODO(dragondriver): necessary? may be very large + zap.Error(err)) + + return metrics, err + } + + log.Debug("IndexCoord.GetMetrics failed, request metric type is not implemented yet", + zap.Int64("node_id", i.ID), + zap.String("req", req.Request), + zap.String("metric_type", metricType)) + + return &milvuspb.GetMetricsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: metricsinfo.MsgUnimplementedMetric, + }, + Response: "", + }, nil +} + func (i *IndexCoord) tsLoop() { tsoTicker := time.NewTicker(tso.UpdateTimestampStep) defer tsoTicker.Stop() diff --git a/internal/indexcoord/index_coord_test.go b/internal/indexcoord/index_coord_test.go index 25269a6874..617a471385 100644 --- a/internal/indexcoord/index_coord_test.go +++ b/internal/indexcoord/index_coord_test.go @@ -17,6 +17,16 @@ import ( "testing" "time" + "github.com/milvus-io/milvus/internal/util/typeutil" + + "github.com/milvus-io/milvus/internal/proto/milvuspb" + + "go.uber.org/zap" + + "github.com/milvus-io/milvus/internal/log" + + "github.com/milvus-io/milvus/internal/util/metricsinfo" + "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" @@ -63,6 +73,66 @@ func (in *indexNodeMock) CreateIndex(ctx context.Context, req *indexpb.CreateInd }, nil } +func getSystemInfoMetricsByIndexNodeMock( + ctx context.Context, + req *milvuspb.GetMetricsRequest, + in *indexNodeMock, +) (*milvuspb.GetMetricsResponse, error) { + + id := UniqueID(16384) + + nodeInfos := metricsinfo.IndexNodeInfos{ + BaseComponentInfos: metricsinfo.BaseComponentInfos{ + Name: metricsinfo.ConstructComponentName(typeutil.IndexNodeRole, id), + }, + } + resp, err := metricsinfo.MarshalComponentInfos(nodeInfos) + if err != nil { + return &milvuspb.GetMetricsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: err.Error(), + }, + Response: "", + ComponentName: metricsinfo.ConstructComponentName(typeutil.IndexNodeRole, id), + }, nil + } + + return &milvuspb.GetMetricsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + Reason: "", + }, + Response: resp, + ComponentName: metricsinfo.ConstructComponentName(typeutil.IndexNodeRole, id), + }, nil +} + +func (in *indexNodeMock) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { + metricType, err := metricsinfo.ParseMetricType(req.Request) + if err != nil { + return &milvuspb.GetMetricsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: err.Error(), + }, + Response: "", + }, nil + } + + if metricType == metricsinfo.SystemInfoMetrics { + return getSystemInfoMetricsByIndexNodeMock(ctx, req, in) + } + + return &milvuspb.GetMetricsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: metricsinfo.MsgUnimplementedMetric, + }, + Response: "", + }, nil +} + func TestIndexCoord(t *testing.T) { ctx := context.Background() ic, err := NewIndexCoord(ctx) @@ -141,6 +211,16 @@ func TestIndexCoord(t *testing.T) { assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode) }) + t.Run("GetMetrics, system info", func(t *testing.T) { + req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics) + assert.Nil(t, err) + resp, err := ic.GetMetrics(ctx, req) + assert.Nil(t, err) + log.Info("GetMetrics, system info", + zap.String("name", resp.ComponentName), + zap.String("resp", resp.Response)) + }) + time.Sleep(11 * time.Second) ic.nodeManager.RemoveNode(indexNodeID) err = ic.Stop() diff --git a/internal/indexcoord/metrics_info.go b/internal/indexcoord/metrics_info.go new file mode 100644 index 0000000000..c2e7ea325c --- /dev/null +++ b/internal/indexcoord/metrics_info.go @@ -0,0 +1,119 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package indexcoord + +import ( + "context" + + "github.com/milvus-io/milvus/internal/util/typeutil" + + "go.uber.org/zap" + + "github.com/milvus-io/milvus/internal/log" + + "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/milvus-io/milvus/internal/proto/milvuspb" + "github.com/milvus-io/milvus/internal/util/metricsinfo" +) + +// TODO(dragondriver): add more detail metrics +func getSystemInfoMetrics( + ctx context.Context, + req *milvuspb.GetMetricsRequest, + coord *IndexCoord, +) (*milvuspb.GetMetricsResponse, error) { + + clusterTopology := metricsinfo.IndexClusterTopology{ + Self: metricsinfo.IndexCoordInfos{ + BaseComponentInfos: metricsinfo.BaseComponentInfos{ + Name: metricsinfo.ConstructComponentName(typeutil.IndexCoordRole, coord.ID), + }, + }, + ConnectedNodes: make([]metricsinfo.IndexNodeInfos, 0), + } + + nodesMetrics := coord.nodeManager.getMetrics(ctx, req) + for _, nodeMetrics := range nodesMetrics { + if nodeMetrics.err != nil { + log.Warn("invalid metrics of index node was found", + zap.Error(nodeMetrics.err)) + clusterTopology.ConnectedNodes = append(clusterTopology.ConnectedNodes, metricsinfo.IndexNodeInfos{ + BaseComponentInfos: metricsinfo.BaseComponentInfos{ + HasError: true, + ErrorReason: nodeMetrics.err.Error(), + // Name doesn't matter here cause we can't get it when error occurs, using address as the Name? + Name: "", + }, + }) + continue + } + + if nodeMetrics.resp.Status.ErrorCode != commonpb.ErrorCode_Success { + log.Warn("invalid metrics of index node was found", + zap.Any("error_code", nodeMetrics.resp.Status.ErrorCode), + zap.Any("error_reason", nodeMetrics.resp.Status.Reason)) + clusterTopology.ConnectedNodes = append(clusterTopology.ConnectedNodes, metricsinfo.IndexNodeInfos{ + BaseComponentInfos: metricsinfo.BaseComponentInfos{ + HasError: true, + ErrorReason: nodeMetrics.resp.Status.Reason, + Name: nodeMetrics.resp.ComponentName, + }, + }) + continue + } + + infos := metricsinfo.IndexNodeInfos{} + err := metricsinfo.UnmarshalComponentInfos(nodeMetrics.resp.Response, &infos) + if err != nil { + log.Warn("invalid metrics of index node was found", + zap.Error(err)) + clusterTopology.ConnectedNodes = append(clusterTopology.ConnectedNodes, metricsinfo.IndexNodeInfos{ + BaseComponentInfos: metricsinfo.BaseComponentInfos{ + HasError: true, + ErrorReason: err.Error(), + Name: nodeMetrics.resp.ComponentName, + }, + }) + continue + } + clusterTopology.ConnectedNodes = append(clusterTopology.ConnectedNodes, infos) + } + + coordTopology := metricsinfo.IndexCoordTopology{ + Cluster: clusterTopology, + Connections: metricsinfo.ConnTopology{ + Name: metricsinfo.ConstructComponentName(typeutil.IndexCoordRole, coord.ID), + // TODO(dragondriver): connection info + }, + } + + resp, err := metricsinfo.MarshalTopology(coordTopology) + if err != nil { + return &milvuspb.GetMetricsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: err.Error(), + }, + Response: "", + ComponentName: metricsinfo.ConstructComponentName(typeutil.IndexCoordRole, coord.ID), + }, nil + } + + return &milvuspb.GetMetricsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + Reason: "", + }, + Response: resp, + ComponentName: metricsinfo.ConstructComponentName(typeutil.IndexCoordRole, coord.ID), + }, nil +} diff --git a/internal/indexcoord/metrics_info_test.go b/internal/indexcoord/metrics_info_test.go new file mode 100644 index 0000000000..78e5f5e407 --- /dev/null +++ b/internal/indexcoord/metrics_info_test.go @@ -0,0 +1,22 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package indexcoord + +import ( + "testing" + + "github.com/milvus-io/milvus/internal/log" +) + +func TestGetSystemInfoMetrics(t *testing.T) { + log.Info("TestGetSystemInfoMetrics, todo") +} diff --git a/internal/indexcoord/node_manager.go b/internal/indexcoord/node_manager.go index 94d1a1ac23..6a432a68bf 100644 --- a/internal/indexcoord/node_manager.go +++ b/internal/indexcoord/node_manager.go @@ -15,6 +15,8 @@ import ( "context" "sync" + "github.com/milvus-io/milvus/internal/proto/milvuspb" + grpcindexnodeclient "github.com/milvus-io/milvus/internal/distributed/indexnode/client" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/types" @@ -94,3 +96,24 @@ func (nm *NodeManager) PeekClient() (UniqueID, types.IndexNode) { } return nodeID, client } + +type indexNodeGetMetricsResponse struct { + resp *milvuspb.GetMetricsResponse + err error +} + +func (nm *NodeManager) getMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) []indexNodeGetMetricsResponse { + nm.lock.RLock() + defer nm.lock.RUnlock() + + ret := make([]indexNodeGetMetricsResponse, 0, len(nm.nodeClients)) + for _, node := range nm.nodeClients { + resp, err := node.GetMetrics(ctx, req) + ret = append(ret, indexNodeGetMetricsResponse{ + resp: resp, + err: err, + }) + } + + return ret +} diff --git a/internal/indexcoord/node_manager_test.go b/internal/indexcoord/node_manager_test.go new file mode 100644 index 0000000000..c5cb5297c2 --- /dev/null +++ b/internal/indexcoord/node_manager_test.go @@ -0,0 +1,22 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package indexcoord + +import ( + "testing" + + "github.com/milvus-io/milvus/internal/log" +) + +func TestNodeManager_getMetrics(t *testing.T) { + log.Info("TestNodeManager_getMetrics, todo") +} diff --git a/internal/indexnode/errors.go b/internal/indexnode/errors.go new file mode 100644 index 0000000000..be96493a6a --- /dev/null +++ b/internal/indexnode/errors.go @@ -0,0 +1,25 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package indexnode + +import ( + "errors" + "fmt" +) + +func msgIndexNodeIsUnhealthy(nodeID UniqueID) string { + return fmt.Sprintf("index node %d is not ready", nodeID) +} + +func errIndexNodeIsUnhealthy(nodeID UniqueID) error { + return errors.New(msgIndexNodeIsUnhealthy(nodeID)) +} diff --git a/internal/indexnode/errors_test.go b/internal/indexnode/errors_test.go new file mode 100644 index 0000000000..2d591d6340 --- /dev/null +++ b/internal/indexnode/errors_test.go @@ -0,0 +1,35 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package indexnode + +import ( + "testing" + + "github.com/milvus-io/milvus/internal/util/typeutil" + + "github.com/milvus-io/milvus/internal/log" + "go.uber.org/zap" +) + +func TestMsgIndexNodeIsUnhealthy(t *testing.T) { + nodeIDList := []typeutil.UniqueID{1, 2, 3} + for _, nodeID := range nodeIDList { + log.Info("TestMsgIndexNodeIsUnhealthy", zap.String("msg", msgIndexNodeIsUnhealthy(nodeID))) + } +} + +func TestErrIndexNodeIsUnhealthy(t *testing.T) { + nodeIDList := []typeutil.UniqueID{1, 2, 3} + for _, nodeID := range nodeIDList { + log.Info("TestErrIndexNodeIsUnhealthy", zap.Error(errIndexNodeIsUnhealthy(nodeID))) + } +} diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go index 99021e1fbc..8a9b073b0a 100644 --- a/internal/indexnode/indexnode.go +++ b/internal/indexnode/indexnode.go @@ -19,6 +19,8 @@ import ( "sync/atomic" "time" + "github.com/milvus-io/milvus/internal/util/metricsinfo" + "go.uber.org/zap" "github.com/milvus-io/milvus/internal/kv" @@ -145,6 +147,11 @@ func (i *IndexNode) UpdateStateCode(code internalpb.StateCode) { i.stateCode.Store(code) } +func (i *IndexNode) isHealthy() bool { + code := i.stateCode.Load().(internalpb.StateCode) + return code == internalpb.StateCode_Healthy +} + func (i *IndexNode) CreateIndex(ctx context.Context, request *indexpb.CreateIndexRequest) (*commonpb.Status, error) { if i.stateCode.Load().(internalpb.StateCode) != internalpb.StateCode_Healthy { return &commonpb.Status{ @@ -242,3 +249,70 @@ func (i *IndexNode) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringR }, }, nil } + +// TODO(dragondriver): cache the Metrics and set a retention to the cache +func (i *IndexNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { + log.Debug("IndexNode.GetMetrics", + zap.Int64("node_id", Params.NodeID), + zap.String("req", req.Request)) + + if !i.isHealthy() { + log.Warn("IndexNode.GetMetrics failed", + zap.Int64("node_id", Params.NodeID), + zap.String("req", req.Request), + zap.Error(errIndexNodeIsUnhealthy(Params.NodeID))) + + return &milvuspb.GetMetricsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: msgIndexNodeIsUnhealthy(Params.NodeID), + }, + Response: "", + }, nil + } + + metricType, err := metricsinfo.ParseMetricType(req.Request) + if err != nil { + log.Warn("IndexNode.GetMetrics failed to parse metric type", + zap.Int64("node_id", Params.NodeID), + zap.String("req", req.Request), + zap.Error(err)) + + return &milvuspb.GetMetricsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: err.Error(), + }, + Response: "", + }, nil + } + + log.Debug("IndexNode.GetMetrics", + zap.String("metric_type", metricType)) + + if metricType == metricsinfo.SystemInfoMetrics { + metrics, err := getSystemInfoMetrics(ctx, req, i) + + log.Debug("IndexNode.GetMetrics", + zap.Int64("node_id", Params.NodeID), + zap.String("req", req.Request), + zap.String("metric_type", metricType), + zap.Any("metrics", metrics), // TODO(dragondriver): necessary? may be very large + zap.Error(err)) + + return metrics, err + } + + log.Debug("IndexNode.GetMetrics failed, request metric type is not implemented yet", + zap.Int64("node_id", Params.NodeID), + zap.String("req", req.Request), + zap.String("metric_type", metricType)) + + return &milvuspb.GetMetricsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: metricsinfo.MsgUnimplementedMetric, + }, + Response: "", + }, nil +} diff --git a/internal/indexnode/indexnode_test.go b/internal/indexnode/indexnode_test.go index 92185b2e9f..e27b1c91ef 100644 --- a/internal/indexnode/indexnode_test.go +++ b/internal/indexnode/indexnode_test.go @@ -18,6 +18,11 @@ import ( "testing" "time" + "github.com/milvus-io/milvus/internal/log" + "go.uber.org/zap" + + "github.com/milvus-io/milvus/internal/util/metricsinfo" + "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/golang/protobuf/proto" @@ -426,6 +431,16 @@ func TestIndexNode(t *testing.T) { assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) }) + t.Run("GetMetrics_system_info", func(t *testing.T) { + req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics) + assert.Nil(t, err) + resp, err := in.GetMetrics(ctx, req) + assert.Nil(t, err) + log.Info("GetMetrics_system_info", + zap.String("resp", resp.Response), + zap.String("name", resp.ComponentName)) + }) + err = in.Stop() assert.Nil(t, err) } diff --git a/internal/indexnode/metrics_info.go b/internal/indexnode/metrics_info.go new file mode 100644 index 0000000000..d178d5da49 --- /dev/null +++ b/internal/indexnode/metrics_info.go @@ -0,0 +1,56 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package indexnode + +import ( + "context" + + "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/milvus-io/milvus/internal/util/metricsinfo" + "github.com/milvus-io/milvus/internal/util/typeutil" + + "github.com/milvus-io/milvus/internal/proto/milvuspb" +) + +// TODO(dragondriver): maybe IndexNode should be an interface so that we can mock it in the test cases +func getSystemInfoMetrics( + ctx context.Context, + req *milvuspb.GetMetricsRequest, + node *IndexNode, +) (*milvuspb.GetMetricsResponse, error) { + // TODO(dragondriver): add more metrics + nodeInfos := metricsinfo.IndexNodeInfos{ + BaseComponentInfos: metricsinfo.BaseComponentInfos{ + Name: metricsinfo.ConstructComponentName(typeutil.IndexNodeRole, Params.NodeID), + }, + } + resp, err := metricsinfo.MarshalComponentInfos(nodeInfos) + if err != nil { + return &milvuspb.GetMetricsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: err.Error(), + }, + Response: "", + ComponentName: metricsinfo.ConstructComponentName(typeutil.IndexNodeRole, Params.NodeID), + }, nil + } + + return &milvuspb.GetMetricsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + Reason: "", + }, + Response: resp, + ComponentName: metricsinfo.ConstructComponentName(typeutil.IndexNodeRole, Params.NodeID), + }, nil +} diff --git a/internal/indexnode/metrics_info_test.go b/internal/indexnode/metrics_info_test.go new file mode 100644 index 0000000000..e708891dc3 --- /dev/null +++ b/internal/indexnode/metrics_info_test.go @@ -0,0 +1,22 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package indexnode + +import ( + "testing" + + "github.com/milvus-io/milvus/internal/log" +) + +func TestGetSystemInfoMetrics(t *testing.T) { + log.Info("TestGetSystemInfoMetrics, todo") +} diff --git a/internal/proto/index_coord.proto b/internal/proto/index_coord.proto index 6481fdeba4..14237fb6d0 100644 --- a/internal/proto/index_coord.proto +++ b/internal/proto/index_coord.proto @@ -16,6 +16,9 @@ service IndexCoord { rpc GetIndexStates(GetIndexStatesRequest) returns (GetIndexStatesResponse) {} rpc GetIndexFilePaths(GetIndexFilePathsRequest) returns (GetIndexFilePathsResponse){} rpc DropIndex(DropIndexRequest) returns (common.Status) {} + + // https://wiki.lfaidata.foundation/display/MIL/MEP+8+--+Add+metrics+for+proxy + rpc GetMetrics(milvus.GetMetricsRequest) returns (milvus.GetMetricsResponse) {} } service IndexNode { @@ -23,6 +26,9 @@ service IndexNode { rpc GetTimeTickChannel(internal.GetTimeTickChannelRequest) returns(milvus.StringResponse) {} rpc GetStatisticsChannel(internal.GetStatisticsChannelRequest) returns(milvus.StringResponse){} rpc CreateIndex(CreateIndexRequest) returns (common.Status){} + + // https://wiki.lfaidata.foundation/display/MIL/MEP+8+--+Add+metrics+for+proxy + rpc GetMetrics(milvus.GetMetricsRequest) returns (milvus.GetMetricsResponse) {} } message RegisterNodeRequest { diff --git a/internal/proto/indexpb/index_coord.pb.go b/internal/proto/indexpb/index_coord.pb.go index df24ee1617..b93304240f 100644 --- a/internal/proto/indexpb/index_coord.pb.go +++ b/internal/proto/indexpb/index_coord.pb.go @@ -809,67 +809,68 @@ func init() { func init() { proto.RegisterFile("index_coord.proto", fileDescriptor_f9e019eb3fda53c2) } var fileDescriptor_f9e019eb3fda53c2 = []byte{ - // 952 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xec, 0x56, 0xcd, 0x6e, 0x1c, 0x45, - 0x10, 0xf6, 0xec, 0x64, 0xff, 0x6a, 0x8d, 0x15, 0x37, 0x21, 0x1a, 0x36, 0x44, 0x59, 0x0f, 0x01, - 0x2d, 0x28, 0x59, 0x47, 0x1b, 0x02, 0x27, 0x24, 0xb0, 0x57, 0x58, 0x2b, 0x94, 0xc8, 0xea, 0x58, - 0x1c, 0x90, 0xd0, 0xaa, 0xbd, 0x53, 0xb6, 0x5b, 0x99, 0x3f, 0x4f, 0xf7, 0x46, 0xf8, 0xce, 0x9d, - 0x1b, 0x88, 0x07, 0x41, 0x3c, 0x07, 0x67, 0x5e, 0x82, 0x47, 0x40, 0xdd, 0xd3, 0x33, 0x99, 0x99, - 0x9d, 0x75, 0xd6, 0x98, 0x70, 0xca, 0x6d, 0xaa, 0xba, 0xaa, 0xbf, 0xae, 0xaf, 0x6a, 0xbe, 0x6e, - 0xd8, 0xe6, 0xa1, 0x87, 0x3f, 0xce, 0xe6, 0x51, 0x94, 0x78, 0xa3, 0x38, 0x89, 0x64, 0x44, 0x48, - 0xc0, 0xfd, 0x97, 0x0b, 0x91, 0x5a, 0x23, 0xbd, 0xde, 0xdf, 0x9c, 0x47, 0x41, 0x10, 0x85, 0xa9, - 0xaf, 0xbf, 0xc5, 0x43, 0x89, 0x49, 0xc8, 0x7c, 0x63, 0x6f, 0x16, 0x33, 0xdc, 0x5f, 0x2d, 0x78, - 0x97, 0xe2, 0x29, 0x17, 0x12, 0x93, 0x67, 0x91, 0x87, 0x14, 0xcf, 0x17, 0x28, 0x24, 0x79, 0x04, - 0x37, 0x8e, 0x99, 0x40, 0xc7, 0x1a, 0x58, 0xc3, 0xde, 0xf8, 0x83, 0x51, 0x09, 0xc6, 0xec, 0xff, - 0x54, 0x9c, 0xee, 0x31, 0x81, 0x54, 0x47, 0x92, 0xcf, 0xa1, 0xcd, 0x3c, 0x2f, 0x41, 0x21, 0x9c, - 0xc6, 0x25, 0x49, 0x5f, 0xa7, 0x31, 0x34, 0x0b, 0x26, 0xb7, 0xa1, 0x15, 0x46, 0x1e, 0x4e, 0x27, - 0x8e, 0x3d, 0xb0, 0x86, 0x36, 0x35, 0x96, 0xfb, 0xb3, 0x05, 0xb7, 0xca, 0x27, 0x13, 0x71, 0x14, - 0x0a, 0x24, 0x8f, 0xa1, 0x25, 0x24, 0x93, 0x0b, 0x61, 0x0e, 0x77, 0xa7, 0x16, 0xe7, 0xb9, 0x0e, - 0xa1, 0x26, 0x94, 0xec, 0x41, 0x8f, 0x87, 0x5c, 0xce, 0x62, 0x96, 0xb0, 0x20, 0x3b, 0xe1, 0xce, - 0xa8, 0xc2, 0x9e, 0x21, 0x6a, 0x1a, 0x72, 0x79, 0xa8, 0x03, 0x29, 0xf0, 0xfc, 0xdb, 0xfd, 0x12, - 0xde, 0x3b, 0x40, 0x39, 0x55, 0x1c, 0xab, 0xdd, 0x51, 0x64, 0x64, 0xdd, 0x87, 0x77, 0x34, 0xf3, - 0x7b, 0x0b, 0xee, 0x7b, 0xd3, 0x89, 0x3a, 0x98, 0x3d, 0xb4, 0x69, 0xd9, 0xe9, 0xfe, 0x61, 0x41, - 0x57, 0x27, 0x4f, 0xc3, 0x93, 0x88, 0x3c, 0x81, 0xa6, 0x3a, 0x5a, 0xca, 0xf0, 0xd6, 0xf8, 0x5e, - 0x6d, 0x11, 0xaf, 0xb0, 0x68, 0x1a, 0x4d, 0x5c, 0xd8, 0x2c, 0xee, 0xaa, 0x0b, 0xb1, 0x69, 0xc9, - 0x47, 0x1c, 0x68, 0x6b, 0x3b, 0xa7, 0x34, 0x33, 0xc9, 0x5d, 0x80, 0x74, 0x84, 0x42, 0x16, 0xa0, - 0x73, 0x63, 0x60, 0x0d, 0xbb, 0xb4, 0xab, 0x3d, 0xcf, 0x58, 0x80, 0xaa, 0x15, 0x09, 0x32, 0x11, - 0x85, 0x4e, 0x53, 0x2f, 0x19, 0xcb, 0xfd, 0xc9, 0x82, 0xdb, 0xd5, 0xca, 0xaf, 0xd3, 0x8c, 0x27, - 0x69, 0x12, 0xaa, 0x3e, 0xd8, 0xc3, 0xde, 0xf8, 0xee, 0x68, 0x79, 0x8a, 0x47, 0x39, 0x55, 0xd4, - 0x04, 0xbb, 0x7f, 0x36, 0x80, 0xec, 0x27, 0xc8, 0x24, 0xea, 0xb5, 0x8c, 0xfd, 0x2a, 0x25, 0x56, - 0x0d, 0x25, 0xe5, 0xc2, 0x1b, 0xd5, 0xc2, 0x57, 0x33, 0xe6, 0x40, 0xfb, 0x25, 0x26, 0x82, 0x47, - 0xa1, 0xa6, 0xcb, 0xa6, 0x99, 0x49, 0xee, 0x40, 0x37, 0x40, 0xc9, 0x66, 0x31, 0x93, 0x67, 0x86, - 0xaf, 0x8e, 0x72, 0x1c, 0x32, 0x79, 0xa6, 0xf0, 0x3c, 0x66, 0x16, 0x85, 0xd3, 0x1a, 0xd8, 0x0a, - 0x4f, 0x79, 0xd4, 0xaa, 0x9e, 0x46, 0x79, 0x11, 0x63, 0x36, 0x8d, 0x6d, 0xcd, 0xc2, 0x4e, 0x2d, - 0x75, 0xdf, 0xe2, 0xc5, 0x77, 0xcc, 0x5f, 0xe0, 0x21, 0xe3, 0x09, 0x05, 0x95, 0x95, 0x4e, 0x23, - 0x99, 0x98, 0xb2, 0xb3, 0x4d, 0x3a, 0xeb, 0x6e, 0xd2, 0xd3, 0x69, 0x66, 0xa6, 0x7f, 0x6b, 0xc0, - 0x76, 0x4a, 0xd2, 0xff, 0x46, 0x69, 0x99, 0x9b, 0xe6, 0x6b, 0xb8, 0x69, 0xfd, 0x17, 0xdc, 0xb4, - 0xff, 0x15, 0x37, 0x01, 0x90, 0x22, 0x35, 0xd7, 0x99, 0xf8, 0x35, 0x7e, 0x5b, 0xf7, 0x2b, 0x70, - 0xb2, 0x9f, 0xec, 0x1b, 0xee, 0xa3, 0x66, 0xe3, 0x6a, 0x0a, 0xf3, 0x8b, 0x05, 0xdb, 0xa5, 0x7c, - 0xad, 0x34, 0x6f, 0xea, 0xc0, 0x64, 0x08, 0x37, 0x53, 0x96, 0x4f, 0xb8, 0x8f, 0xa6, 0x9d, 0xb6, - 0x6e, 0xe7, 0x16, 0x2f, 0x55, 0xa1, 0x0e, 0xf6, 0x7e, 0x4d, 0x6d, 0xd7, 0x61, 0x74, 0x02, 0x50, - 0x80, 0x4d, 0x75, 0xe4, 0xa3, 0x95, 0x3a, 0x52, 0x24, 0x84, 0x76, 0x4f, 0xf2, 0x83, 0xfd, 0xd5, - 0x30, 0x9a, 0xfc, 0x14, 0x25, 0x5b, 0x6b, 0xec, 0x73, 0xdd, 0x6e, 0x5c, 0x49, 0xb7, 0xef, 0x41, - 0xef, 0x84, 0x71, 0x7f, 0x66, 0xf4, 0xd5, 0xd6, 0xbf, 0x0b, 0x28, 0x17, 0xd5, 0x1e, 0xf2, 0x05, - 0xd8, 0x09, 0x9e, 0x6b, 0x91, 0x59, 0x51, 0xc8, 0xd2, 0x6f, 0x4a, 0x55, 0x46, 0x6d, 0x17, 0x9a, - 0x75, 0x5d, 0x20, 0x3b, 0xb0, 0x19, 0xb0, 0xe4, 0xc5, 0xcc, 0x43, 0x1f, 0x25, 0x7a, 0x4e, 0x6b, - 0x60, 0x0d, 0x3b, 0xb4, 0xa7, 0x7c, 0x93, 0xd4, 0x55, 0xb8, 0x8c, 0xdb, 0xc5, 0xcb, 0xb8, 0x28, - 0x83, 0x9d, 0xb2, 0x0c, 0xf6, 0xa1, 0x93, 0xe0, 0xfc, 0x62, 0xee, 0xa3, 0xe7, 0x74, 0xf5, 0x86, - 0xb9, 0xed, 0x3e, 0x80, 0x9b, 0x93, 0x24, 0x8a, 0x4b, 0xd2, 0x52, 0xd0, 0x05, 0xab, 0xa4, 0x0b, - 0xe3, 0xbf, 0x9b, 0x00, 0x3a, 0x74, 0x5f, 0xbd, 0x6f, 0x48, 0x0c, 0xe4, 0x00, 0xe5, 0x7e, 0x14, - 0xc4, 0x51, 0x88, 0xa1, 0x4c, 0xef, 0x1d, 0xf2, 0x68, 0xc5, 0x95, 0xbd, 0x1c, 0x6a, 0x00, 0xfb, - 0x1f, 0xaf, 0xc8, 0xa8, 0x84, 0xbb, 0x1b, 0x24, 0xd0, 0x88, 0x47, 0x3c, 0xc0, 0x23, 0x3e, 0x7f, - 0xb1, 0x7f, 0xc6, 0xc2, 0x10, 0xfd, 0xcb, 0x10, 0x2b, 0xa1, 0x19, 0xe2, 0x87, 0xe5, 0x0c, 0x63, - 0x3c, 0x97, 0x09, 0x0f, 0x4f, 0xb3, 0xa1, 0x77, 0x37, 0xc8, 0x39, 0xdc, 0x3a, 0x40, 0x8d, 0xce, - 0x85, 0xe4, 0x73, 0x91, 0x01, 0x8e, 0x57, 0x03, 0x2e, 0x05, 0x5f, 0x11, 0xf2, 0x07, 0x80, 0x57, - 0x53, 0x44, 0xd6, 0x9b, 0xb2, 0x65, 0x02, 0xab, 0x61, 0xf9, 0xf6, 0x1c, 0xb6, 0xca, 0xcf, 0x04, - 0xf2, 0x49, 0x5d, 0x6e, 0xed, 0x23, 0xaa, 0xff, 0xe9, 0x3a, 0xa1, 0x39, 0x54, 0x02, 0xdb, 0x4b, - 0x82, 0x42, 0x1e, 0x5c, 0xb6, 0x45, 0x55, 0x53, 0xfb, 0x0f, 0xd7, 0x8c, 0xce, 0x31, 0x0f, 0xa1, - 0x9b, 0x8f, 0x33, 0xb9, 0x5f, 0x97, 0x5d, 0x9d, 0xf6, 0xfe, 0x65, 0x52, 0xe6, 0x6e, 0x8c, 0x7f, - 0xb7, 0x8d, 0xfc, 0xa8, 0x07, 0xee, 0xdb, 0x89, 0x7f, 0x03, 0x13, 0x7f, 0x04, 0xbd, 0xc2, 0x93, - 0x91, 0xd4, 0xce, 0xf2, 0xf2, 0x9b, 0xf2, 0x35, 0x7d, 0xdb, 0xfb, 0xec, 0xfb, 0xf1, 0x29, 0x97, - 0x67, 0x8b, 0x63, 0xb5, 0xb2, 0x9b, 0x86, 0x3e, 0xe4, 0x91, 0xf9, 0xda, 0xcd, 0x0a, 0xd8, 0xd5, - 0xd9, 0xbb, 0x1a, 0x25, 0x3e, 0x3e, 0x6e, 0x69, 0xf3, 0xf1, 0x3f, 0x01, 0x00, 0x00, 0xff, 0xff, - 0xfc, 0x30, 0x24, 0x6f, 0xc7, 0x0d, 0x00, 0x00, + // 975 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xec, 0x56, 0x5f, 0x6f, 0x1b, 0x45, + 0x10, 0xcf, 0xf9, 0x12, 0x3b, 0x1e, 0x87, 0xa8, 0x59, 0x4a, 0x75, 0xb8, 0x54, 0x75, 0x8e, 0x52, + 0x0c, 0x6a, 0x9d, 0xca, 0xa5, 0xf0, 0x84, 0x04, 0x89, 0x45, 0x64, 0xa1, 0x54, 0xd1, 0x36, 0xe2, + 0x01, 0x09, 0x59, 0x1b, 0xdf, 0x24, 0x59, 0xf5, 0xfe, 0xe5, 0x76, 0x5d, 0x91, 0x77, 0xde, 0x79, + 0x2b, 0xe2, 0x93, 0xf0, 0x39, 0xfa, 0xcc, 0x97, 0x41, 0xbb, 0xb7, 0x77, 0xb9, 0x3b, 0x9f, 0x53, + 0x87, 0x50, 0x78, 0xe1, 0xed, 0x66, 0xf6, 0x37, 0x33, 0x3b, 0xbf, 0x9d, 0xfd, 0xdd, 0xc2, 0x16, + 0x0f, 0x3d, 0xfc, 0x79, 0x32, 0x8d, 0xa2, 0xc4, 0x1b, 0xc4, 0x49, 0x24, 0x23, 0x42, 0x02, 0xee, + 0xbf, 0x9a, 0x89, 0xd4, 0x1a, 0xe8, 0xf5, 0xee, 0xc6, 0x34, 0x0a, 0x82, 0x28, 0x4c, 0x7d, 0xdd, + 0x4d, 0x1e, 0x4a, 0x4c, 0x42, 0xe6, 0x1b, 0x7b, 0xa3, 0x18, 0xe1, 0xfe, 0x66, 0xc1, 0xfb, 0x14, + 0x4f, 0xb9, 0x90, 0x98, 0x3c, 0x8f, 0x3c, 0xa4, 0x78, 0x3e, 0x43, 0x21, 0xc9, 0x13, 0x58, 0x3d, + 0x66, 0x02, 0x1d, 0xab, 0x67, 0xf5, 0x3b, 0xc3, 0x8f, 0x06, 0xa5, 0x32, 0x26, 0xff, 0x81, 0x38, + 0xdd, 0x65, 0x02, 0xa9, 0x46, 0x92, 0x2f, 0xa1, 0xc5, 0x3c, 0x2f, 0x41, 0x21, 0x9c, 0xc6, 0x15, + 0x41, 0xdf, 0xa6, 0x18, 0x9a, 0x81, 0xc9, 0x1d, 0x68, 0x86, 0x91, 0x87, 0xe3, 0x91, 0x63, 0xf7, + 0xac, 0xbe, 0x4d, 0x8d, 0xe5, 0xfe, 0x6a, 0xc1, 0xed, 0xf2, 0xce, 0x44, 0x1c, 0x85, 0x02, 0xc9, + 0x53, 0x68, 0x0a, 0xc9, 0xe4, 0x4c, 0x98, 0xcd, 0xdd, 0xad, 0xad, 0xf3, 0x42, 0x43, 0xa8, 0x81, + 0x92, 0x5d, 0xe8, 0xf0, 0x90, 0xcb, 0x49, 0xcc, 0x12, 0x16, 0x64, 0x3b, 0xdc, 0x1e, 0x54, 0xd8, + 0x33, 0x44, 0x8d, 0x43, 0x2e, 0x0f, 0x35, 0x90, 0x02, 0xcf, 0xbf, 0xdd, 0xaf, 0xe1, 0x83, 0x7d, + 0x94, 0x63, 0xc5, 0xb1, 0xca, 0x8e, 0x22, 0x23, 0xeb, 0x01, 0xbc, 0xa7, 0x99, 0xdf, 0x9d, 0x71, + 0xdf, 0x1b, 0x8f, 0xd4, 0xc6, 0xec, 0xbe, 0x4d, 0xcb, 0x4e, 0xf7, 0x0f, 0x0b, 0xda, 0x3a, 0x78, + 0x1c, 0x9e, 0x44, 0xe4, 0x19, 0xac, 0xa9, 0xad, 0xa5, 0x0c, 0x6f, 0x0e, 0xef, 0xd7, 0x36, 0x71, + 0x59, 0x8b, 0xa6, 0x68, 0xe2, 0xc2, 0x46, 0x31, 0xab, 0x6e, 0xc4, 0xa6, 0x25, 0x1f, 0x71, 0xa0, + 0xa5, 0xed, 0x9c, 0xd2, 0xcc, 0x24, 0xf7, 0x00, 0xd2, 0x11, 0x0a, 0x59, 0x80, 0xce, 0x6a, 0xcf, + 0xea, 0xb7, 0x69, 0x5b, 0x7b, 0x9e, 0xb3, 0x00, 0xd5, 0x51, 0x24, 0xc8, 0x44, 0x14, 0x3a, 0x6b, + 0x7a, 0xc9, 0x58, 0xee, 0x2f, 0x16, 0xdc, 0xa9, 0x76, 0x7e, 0x93, 0xc3, 0x78, 0x96, 0x06, 0xa1, + 0x3a, 0x07, 0xbb, 0xdf, 0x19, 0xde, 0x1b, 0xcc, 0x4f, 0xf1, 0x20, 0xa7, 0x8a, 0x1a, 0xb0, 0xfb, + 0xa6, 0x01, 0x64, 0x2f, 0x41, 0x26, 0x51, 0xaf, 0x65, 0xec, 0x57, 0x29, 0xb1, 0x6a, 0x28, 0x29, + 0x37, 0xde, 0xa8, 0x36, 0xbe, 0x98, 0x31, 0x07, 0x5a, 0xaf, 0x30, 0x11, 0x3c, 0x0a, 0x35, 0x5d, + 0x36, 0xcd, 0x4c, 0x72, 0x17, 0xda, 0x01, 0x4a, 0x36, 0x89, 0x99, 0x3c, 0x33, 0x7c, 0xad, 0x2b, + 0xc7, 0x21, 0x93, 0x67, 0xaa, 0x9e, 0xc7, 0xcc, 0xa2, 0x70, 0x9a, 0x3d, 0x5b, 0xd5, 0x53, 0x1e, + 0xb5, 0xaa, 0xa7, 0x51, 0x5e, 0xc4, 0x98, 0x4d, 0x63, 0x4b, 0xb3, 0xb0, 0x5d, 0x4b, 0xdd, 0xf7, + 0x78, 0xf1, 0x03, 0xf3, 0x67, 0x78, 0xc8, 0x78, 0x42, 0x41, 0x45, 0xa5, 0xd3, 0x48, 0x46, 0xa6, + 0xed, 0x2c, 0xc9, 0xfa, 0xb2, 0x49, 0x3a, 0x3a, 0xcc, 0xcc, 0xf4, 0xef, 0x0d, 0xd8, 0x4a, 0x49, + 0xfa, 0xd7, 0x28, 0x2d, 0x73, 0xb3, 0xf6, 0x16, 0x6e, 0x9a, 0xff, 0x04, 0x37, 0xad, 0xbf, 0xc5, + 0x4d, 0x00, 0xa4, 0x48, 0xcd, 0x4d, 0x26, 0x7e, 0x89, 0x6b, 0xeb, 0x7e, 0x03, 0x4e, 0x76, 0xc9, + 0xbe, 0xe3, 0x3e, 0x6a, 0x36, 0xae, 0xa7, 0x30, 0xaf, 0x2d, 0xd8, 0x2a, 0xc5, 0x6b, 0xa5, 0x79, + 0x57, 0x1b, 0x26, 0x7d, 0xb8, 0x95, 0xb2, 0x7c, 0xc2, 0x7d, 0x34, 0xc7, 0x69, 0xeb, 0xe3, 0xdc, + 0xe4, 0xa5, 0x2e, 0xd4, 0xc6, 0x3e, 0xac, 0xe9, 0xed, 0x26, 0x8c, 0x8e, 0x00, 0x0a, 0x65, 0x53, + 0x1d, 0xf9, 0x64, 0xa1, 0x8e, 0x14, 0x09, 0xa1, 0xed, 0x93, 0x7c, 0x63, 0x7f, 0x36, 0x8c, 0x26, + 0x1f, 0xa0, 0x64, 0x4b, 0x8d, 0x7d, 0xae, 0xdb, 0x8d, 0x6b, 0xe9, 0xf6, 0x7d, 0xe8, 0x9c, 0x30, + 0xee, 0x4f, 0x8c, 0xbe, 0xda, 0xfa, 0xba, 0x80, 0x72, 0x51, 0xed, 0x21, 0x5f, 0x81, 0x9d, 0xe0, + 0xb9, 0x16, 0x99, 0x05, 0x8d, 0xcc, 0x5d, 0x53, 0xaa, 0x22, 0x6a, 0x4f, 0x61, 0xad, 0xee, 0x14, + 0xc8, 0x36, 0x6c, 0x04, 0x2c, 0x79, 0x39, 0xf1, 0xd0, 0x47, 0x89, 0x9e, 0xd3, 0xec, 0x59, 0xfd, + 0x75, 0xda, 0x51, 0xbe, 0x51, 0xea, 0x2a, 0xfc, 0x8c, 0x5b, 0xc5, 0x9f, 0x71, 0x51, 0x06, 0xd7, + 0xcb, 0x32, 0xd8, 0x85, 0xf5, 0x04, 0xa7, 0x17, 0x53, 0x1f, 0x3d, 0xa7, 0xad, 0x13, 0xe6, 0xb6, + 0xfb, 0x08, 0x6e, 0x8d, 0x92, 0x28, 0x2e, 0x49, 0x4b, 0x41, 0x17, 0xac, 0x92, 0x2e, 0x0c, 0xdf, + 0x34, 0x01, 0x34, 0x74, 0x4f, 0xbd, 0x6f, 0x48, 0x0c, 0x64, 0x1f, 0xe5, 0x5e, 0x14, 0xc4, 0x51, + 0x88, 0xa1, 0x4c, 0xff, 0x3b, 0xe4, 0xc9, 0x82, 0x5f, 0xf6, 0x3c, 0xd4, 0x14, 0xec, 0x3e, 0x5c, + 0x10, 0x51, 0x81, 0xbb, 0x2b, 0x24, 0xd0, 0x15, 0x8f, 0x78, 0x80, 0x47, 0x7c, 0xfa, 0x72, 0xef, + 0x8c, 0x85, 0x21, 0xfa, 0x57, 0x55, 0xac, 0x40, 0xb3, 0x8a, 0x1f, 0x97, 0x23, 0x8c, 0xf1, 0x42, + 0x26, 0x3c, 0x3c, 0xcd, 0x86, 0xde, 0x5d, 0x21, 0xe7, 0x70, 0x7b, 0x1f, 0x75, 0x75, 0x2e, 0x24, + 0x9f, 0x8a, 0xac, 0xe0, 0x70, 0x71, 0xc1, 0x39, 0xf0, 0x35, 0x4b, 0xfe, 0x04, 0x70, 0x39, 0x45, + 0x64, 0xb9, 0x29, 0x9b, 0x27, 0xb0, 0x0a, 0xcb, 0xd3, 0x73, 0xd8, 0x2c, 0x3f, 0x13, 0xc8, 0x67, + 0x75, 0xb1, 0xb5, 0x8f, 0xa8, 0xee, 0xe7, 0xcb, 0x40, 0xf3, 0x52, 0x09, 0x6c, 0xcd, 0x09, 0x0a, + 0x79, 0x74, 0x55, 0x8a, 0xaa, 0xa6, 0x76, 0x1f, 0x2f, 0x89, 0xce, 0x6b, 0x1e, 0x42, 0x3b, 0x1f, + 0x67, 0xf2, 0xa0, 0x2e, 0xba, 0x3a, 0xed, 0xdd, 0xab, 0xa4, 0xcc, 0x5d, 0x21, 0x13, 0x80, 0x7d, + 0x94, 0x07, 0x28, 0x13, 0x3e, 0x15, 0xe4, 0x61, 0xed, 0x21, 0x5e, 0x02, 0xb2, 0xa4, 0x9f, 0xbe, + 0x15, 0x97, 0x6d, 0x79, 0xf8, 0x7a, 0xd5, 0xe8, 0x9b, 0x7a, 0x41, 0xff, 0x7f, 0xa5, 0xde, 0xc1, + 0x95, 0x3a, 0x82, 0x4e, 0xe1, 0x4d, 0x4a, 0x6a, 0x2f, 0xcb, 0xfc, 0xa3, 0xf5, 0xbf, 0x1e, 0x8c, + 0xdd, 0x2f, 0x7e, 0x1c, 0x9e, 0x72, 0x79, 0x36, 0x3b, 0x56, 0xa5, 0x77, 0x52, 0xe4, 0x63, 0x1e, + 0x99, 0xaf, 0x9d, 0x8c, 0xa1, 0x1d, 0x9d, 0x69, 0x47, 0xb7, 0x11, 0x1f, 0x1f, 0x37, 0xb5, 0xf9, + 0xf4, 0xaf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x75, 0x9d, 0x20, 0xf1, 0x89, 0x0e, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -891,6 +892,8 @@ type IndexCoordClient interface { GetIndexStates(ctx context.Context, in *GetIndexStatesRequest, opts ...grpc.CallOption) (*GetIndexStatesResponse, error) GetIndexFilePaths(ctx context.Context, in *GetIndexFilePathsRequest, opts ...grpc.CallOption) (*GetIndexFilePathsResponse, error) DropIndex(ctx context.Context, in *DropIndexRequest, opts ...grpc.CallOption) (*commonpb.Status, error) + // https://wiki.lfaidata.foundation/display/MIL/MEP+8+--+Add+metrics+for+proxy + GetMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest, opts ...grpc.CallOption) (*milvuspb.GetMetricsResponse, error) } type indexCoordClient struct { @@ -964,6 +967,15 @@ func (c *indexCoordClient) DropIndex(ctx context.Context, in *DropIndexRequest, return out, nil } +func (c *indexCoordClient) GetMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest, opts ...grpc.CallOption) (*milvuspb.GetMetricsResponse, error) { + out := new(milvuspb.GetMetricsResponse) + err := c.cc.Invoke(ctx, "/milvus.proto.index.IndexCoord/GetMetrics", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // IndexCoordServer is the server API for IndexCoord service. type IndexCoordServer interface { GetComponentStates(context.Context, *internalpb.GetComponentStatesRequest) (*internalpb.ComponentStates, error) @@ -973,6 +985,8 @@ type IndexCoordServer interface { GetIndexStates(context.Context, *GetIndexStatesRequest) (*GetIndexStatesResponse, error) GetIndexFilePaths(context.Context, *GetIndexFilePathsRequest) (*GetIndexFilePathsResponse, error) DropIndex(context.Context, *DropIndexRequest) (*commonpb.Status, error) + // https://wiki.lfaidata.foundation/display/MIL/MEP+8+--+Add+metrics+for+proxy + GetMetrics(context.Context, *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) } // UnimplementedIndexCoordServer can be embedded to have forward compatible implementations. @@ -1000,6 +1014,9 @@ func (*UnimplementedIndexCoordServer) GetIndexFilePaths(ctx context.Context, req func (*UnimplementedIndexCoordServer) DropIndex(ctx context.Context, req *DropIndexRequest) (*commonpb.Status, error) { return nil, status.Errorf(codes.Unimplemented, "method DropIndex not implemented") } +func (*UnimplementedIndexCoordServer) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetMetrics not implemented") +} func RegisterIndexCoordServer(s *grpc.Server, srv IndexCoordServer) { s.RegisterService(&_IndexCoord_serviceDesc, srv) @@ -1131,6 +1148,24 @@ func _IndexCoord_DropIndex_Handler(srv interface{}, ctx context.Context, dec fun return interceptor(ctx, in, info, handler) } +func _IndexCoord_GetMetrics_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(milvuspb.GetMetricsRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(IndexCoordServer).GetMetrics(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/milvus.proto.index.IndexCoord/GetMetrics", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(IndexCoordServer).GetMetrics(ctx, req.(*milvuspb.GetMetricsRequest)) + } + return interceptor(ctx, in, info, handler) +} + var _IndexCoord_serviceDesc = grpc.ServiceDesc{ ServiceName: "milvus.proto.index.IndexCoord", HandlerType: (*IndexCoordServer)(nil), @@ -1163,6 +1198,10 @@ var _IndexCoord_serviceDesc = grpc.ServiceDesc{ MethodName: "DropIndex", Handler: _IndexCoord_DropIndex_Handler, }, + { + MethodName: "GetMetrics", + Handler: _IndexCoord_GetMetrics_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "index_coord.proto", @@ -1176,6 +1215,8 @@ type IndexNodeClient interface { GetTimeTickChannel(ctx context.Context, in *internalpb.GetTimeTickChannelRequest, opts ...grpc.CallOption) (*milvuspb.StringResponse, error) GetStatisticsChannel(ctx context.Context, in *internalpb.GetStatisticsChannelRequest, opts ...grpc.CallOption) (*milvuspb.StringResponse, error) CreateIndex(ctx context.Context, in *CreateIndexRequest, opts ...grpc.CallOption) (*commonpb.Status, error) + // https://wiki.lfaidata.foundation/display/MIL/MEP+8+--+Add+metrics+for+proxy + GetMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest, opts ...grpc.CallOption) (*milvuspb.GetMetricsResponse, error) } type indexNodeClient struct { @@ -1222,12 +1263,23 @@ func (c *indexNodeClient) CreateIndex(ctx context.Context, in *CreateIndexReques return out, nil } +func (c *indexNodeClient) GetMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest, opts ...grpc.CallOption) (*milvuspb.GetMetricsResponse, error) { + out := new(milvuspb.GetMetricsResponse) + err := c.cc.Invoke(ctx, "/milvus.proto.index.IndexNode/GetMetrics", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // IndexNodeServer is the server API for IndexNode service. type IndexNodeServer interface { GetComponentStates(context.Context, *internalpb.GetComponentStatesRequest) (*internalpb.ComponentStates, error) GetTimeTickChannel(context.Context, *internalpb.GetTimeTickChannelRequest) (*milvuspb.StringResponse, error) GetStatisticsChannel(context.Context, *internalpb.GetStatisticsChannelRequest) (*milvuspb.StringResponse, error) CreateIndex(context.Context, *CreateIndexRequest) (*commonpb.Status, error) + // https://wiki.lfaidata.foundation/display/MIL/MEP+8+--+Add+metrics+for+proxy + GetMetrics(context.Context, *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) } // UnimplementedIndexNodeServer can be embedded to have forward compatible implementations. @@ -1246,6 +1298,9 @@ func (*UnimplementedIndexNodeServer) GetStatisticsChannel(ctx context.Context, r func (*UnimplementedIndexNodeServer) CreateIndex(ctx context.Context, req *CreateIndexRequest) (*commonpb.Status, error) { return nil, status.Errorf(codes.Unimplemented, "method CreateIndex not implemented") } +func (*UnimplementedIndexNodeServer) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetMetrics not implemented") +} func RegisterIndexNodeServer(s *grpc.Server, srv IndexNodeServer) { s.RegisterService(&_IndexNode_serviceDesc, srv) @@ -1323,6 +1378,24 @@ func _IndexNode_CreateIndex_Handler(srv interface{}, ctx context.Context, dec fu return interceptor(ctx, in, info, handler) } +func _IndexNode_GetMetrics_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(milvuspb.GetMetricsRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(IndexNodeServer).GetMetrics(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/milvus.proto.index.IndexNode/GetMetrics", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(IndexNodeServer).GetMetrics(ctx, req.(*milvuspb.GetMetricsRequest)) + } + return interceptor(ctx, in, info, handler) +} + var _IndexNode_serviceDesc = grpc.ServiceDesc{ ServiceName: "milvus.proto.index.IndexNode", HandlerType: (*IndexNodeServer)(nil), @@ -1343,6 +1416,10 @@ var _IndexNode_serviceDesc = grpc.ServiceDesc{ MethodName: "CreateIndex", Handler: _IndexNode_CreateIndex_Handler, }, + { + MethodName: "GetMetrics", + Handler: _IndexNode_GetMetrics_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "index_coord.proto", diff --git a/internal/types/types.go b/internal/types/types.go index f9600929df..0af5743b5c 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -68,6 +68,7 @@ type IndexNode interface { TimeTickProvider CreateIndex(ctx context.Context, req *indexpb.CreateIndexRequest) (*commonpb.Status, error) + GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) } type IndexCoord interface { diff --git a/internal/util/metricsinfo/metric_type.go b/internal/util/metricsinfo/metric_type.go index a0b9ddf61a..c675bae456 100644 --- a/internal/util/metricsinfo/metric_type.go +++ b/internal/util/metricsinfo/metric_type.go @@ -14,6 +14,8 @@ package metricsinfo import ( "encoding/json" "fmt" + + "github.com/milvus-io/milvus/internal/proto/milvuspb" ) const ( @@ -21,6 +23,7 @@ const ( SystemInfoMetrics = "system_info" ) +// ParseMetricType returns the metric type of req func ParseMetricType(req string) (string, error) { m := make(map[string]interface{}) err := json.Unmarshal([]byte(req), &m) @@ -33,3 +36,17 @@ func ParseMetricType(req string) (string, error) { } return metricType.(string), nil } + +// ConstructRequestByMetricType constructs a request according to the metric type +func ConstructRequestByMetricType(metricType string) (*milvuspb.GetMetricsRequest, error) { + m := make(map[string]interface{}) + m[MetricTypeKey] = metricType + binary, err := json.Marshal(m) + if err != nil { + return nil, fmt.Errorf("failed to construct request by metric type %s: %s", metricType, err.Error()) + } + return &milvuspb.GetMetricsRequest{ + Base: nil, + Request: string(binary), + }, nil +} diff --git a/internal/util/metricsinfo/metric_type_test.go b/internal/util/metricsinfo/metric_type_test.go index cee603fbf9..fe60e3022a 100644 --- a/internal/util/metricsinfo/metric_type_test.go +++ b/internal/util/metricsinfo/metric_type_test.go @@ -15,6 +15,9 @@ import ( "encoding/json" "testing" + "github.com/milvus-io/milvus/internal/log" + "go.uber.org/zap" + "github.com/stretchr/testify/assert" ) @@ -52,8 +55,26 @@ func TestParseMetricType(t *testing.T) { got, err := ParseMetricType(test.s) assert.Equal(t, test.errIsNil, err == nil) if test.errIsNil && test.want != got { - t.Errorf("ParseMetricType(%s) = %s", test.s, test.want) + t.Errorf("ParseMetricType(%s) = %s, but got: %s", test.s, test.want, got) } } } + +func TestConstructRequestByMetricType(t *testing.T) { + cases := []struct { + metricType string + errIsNil bool + }{ + {SystemInfoMetrics, true}, + } + + for _, test := range cases { + got, err := ConstructRequestByMetricType(test.metricType) + assert.Equal(t, test.errIsNil, err == nil) + if test.errIsNil { + log.Info("TestConstructRequestByMetricType", + zap.String("request", got.Request)) + } + } +} diff --git a/internal/util/metricsinfo/metrics_info.go b/internal/util/metricsinfo/metrics_info.go index e044eed70b..92c03f8626 100644 --- a/internal/util/metricsinfo/metrics_info.go +++ b/internal/util/metricsinfo/metrics_info.go @@ -55,3 +55,15 @@ type ProxyInfos struct { BaseComponentInfos // TODO(dragondriver): add more detail metrics } + +// IndexNodeInfos implements ComponentInfos +type IndexNodeInfos struct { + BaseComponentInfos + // TODO(dragondriver): add more detail metrics +} + +// IndexCoordInfos implements ComponentInfos +type IndexCoordInfos struct { + BaseComponentInfos + // TODO(dragondriver): add more detail metrics +} diff --git a/internal/util/metricsinfo/metrics_info_test.go b/internal/util/metricsinfo/metrics_info_test.go index 4c13795d7c..e4b3e62a98 100644 --- a/internal/util/metricsinfo/metrics_info_test.go +++ b/internal/util/metricsinfo/metrics_info_test.go @@ -67,3 +67,35 @@ func TestQueryCoordInfos_Codec(t *testing.T) { assert.Equal(t, nil, err) assert.Equal(t, infos1.Name, infos2.Name) } + +func TestIndexNodeInfos_Codec(t *testing.T) { + infos1 := IndexNodeInfos{ + BaseComponentInfos: BaseComponentInfos{ + Name: ConstructComponentName(typeutil.IndexNodeRole, 1), + }, + } + s, err := MarshalComponentInfos(infos1) + assert.Equal(t, nil, err) + log.Info("TestIndexNodeInfos_Codec", + zap.String("marshaled_result", s)) + var infos2 IndexNodeInfos + err = UnmarshalComponentInfos(s, &infos2) + assert.Equal(t, nil, err) + assert.Equal(t, infos1.Name, infos2.Name) +} + +func TestIndexCoordInfos_Codec(t *testing.T) { + infos1 := IndexCoordInfos{ + BaseComponentInfos: BaseComponentInfos{ + Name: ConstructComponentName(typeutil.IndexCoordRole, 1), + }, + } + s, err := MarshalComponentInfos(infos1) + assert.Equal(t, nil, err) + log.Info("TestIndexCoordInfos_Codec", + zap.String("marshaled_result", s)) + var infos2 IndexCoordInfos + err = UnmarshalComponentInfos(s, &infos2) + assert.Equal(t, nil, err) + assert.Equal(t, infos1.Name, infos2.Name) +} diff --git a/internal/util/metricsinfo/topology.go b/internal/util/metricsinfo/topology.go index abb54ce697..59091ac5b9 100644 --- a/internal/util/metricsinfo/topology.go +++ b/internal/util/metricsinfo/topology.go @@ -63,6 +63,18 @@ type QueryCoordTopology struct { Connections ConnTopology `json:"connections"` } +// IndexClusterTopology shows the topology between IndexCoord and IndexNodes +type IndexClusterTopology struct { + Self IndexCoordInfos `json:"self"` + ConnectedNodes []IndexNodeInfos `json:"connected_nodes"` +} + +// IndexCoordTopology shows the whole metrics of index cluster +type IndexCoordTopology struct { + Cluster IndexClusterTopology `json:"cluster"` + Connections ConnTopology `json:"connections"` +} + type ConnectionType string const ( diff --git a/internal/util/metricsinfo/topology_test.go b/internal/util/metricsinfo/topology_test.go index 5b26459a2e..c6073eadd8 100644 --- a/internal/util/metricsinfo/topology_test.go +++ b/internal/util/metricsinfo/topology_test.go @@ -38,7 +38,7 @@ func TestConstructComponentName(t *testing.T) { } } -func TestQueryCoordTopology_Codec(t *testing.T) { +func TestQueryClusterTopology_Codec(t *testing.T) { topology1 := QueryClusterTopology{ Self: QueryCoordInfos{ BaseComponentInfos: BaseComponentInfos{ @@ -60,7 +60,7 @@ func TestQueryCoordTopology_Codec(t *testing.T) { } s, err := MarshalTopology(topology1) assert.Equal(t, nil, err) - log.Info("TestQueryCoordTopology_Codec", + log.Info("TestQueryClusterTopology_Codec", zap.String("marshaled_result", s)) var topology2 QueryClusterTopology err = UnmarshalTopology(s, &topology2) @@ -72,6 +72,134 @@ func TestQueryCoordTopology_Codec(t *testing.T) { } } +func TestQueryCoordTopology_Codec(t *testing.T) { + topology1 := QueryCoordTopology{ + Cluster: QueryClusterTopology{ + Self: QueryCoordInfos{ + BaseComponentInfos: BaseComponentInfos{ + Name: ConstructComponentName(typeutil.QueryCoordRole, 1), + }, + }, + ConnectedNodes: []QueryNodeInfos{ + { + BaseComponentInfos: BaseComponentInfos{ + Name: ConstructComponentName(typeutil.QueryNodeRole, 1), + }, + }, + { + BaseComponentInfos: BaseComponentInfos{ + Name: ConstructComponentName(typeutil.QueryNodeRole, 2), + }, + }, + }, + }, + Connections: ConnTopology{ + Name: ConstructComponentName(typeutil.QueryCoordRole, 1), + ConnectedComponents: []string{ + ConstructComponentName(typeutil.RootCoordRole, 1), + }, + }, + } + s, err := MarshalTopology(topology1) + assert.Equal(t, nil, err) + log.Info("TestQueryCoordTopology_Codec", + zap.String("marshaled_result", s)) + var topology2 QueryCoordTopology + err = UnmarshalTopology(s, &topology2) + assert.Equal(t, nil, err) + assert.Equal(t, topology1.Cluster.Self, topology2.Cluster.Self) + assert.Equal(t, len(topology1.Cluster.ConnectedNodes), len(topology2.Cluster.ConnectedNodes)) + for i := range topology1.Cluster.ConnectedNodes { + assert.Equal(t, topology1.Cluster.ConnectedNodes[i], topology2.Cluster.ConnectedNodes[i]) + } + assert.Equal(t, topology1.Connections.Name, topology2.Connections.Name) + assert.Equal(t, len(topology1.Connections.ConnectedComponents), len(topology1.Connections.ConnectedComponents)) + for i := range topology1.Connections.ConnectedComponents { + assert.Equal(t, topology1.Connections.ConnectedComponents[i], topology2.Connections.ConnectedComponents[i]) + } +} + +func TestIndexClusterTopology_Codec(t *testing.T) { + topology1 := IndexClusterTopology{ + Self: IndexCoordInfos{ + BaseComponentInfos: BaseComponentInfos{ + Name: ConstructComponentName(typeutil.IndexCoordRole, 1), + }, + }, + ConnectedNodes: []IndexNodeInfos{ + { + BaseComponentInfos: BaseComponentInfos{ + Name: ConstructComponentName(typeutil.IndexNodeRole, 1), + }, + }, + { + BaseComponentInfos: BaseComponentInfos{ + Name: ConstructComponentName(typeutil.IndexNodeRole, 2), + }, + }, + }, + } + s, err := MarshalTopology(topology1) + assert.Equal(t, nil, err) + log.Info("TestIndexClusterTopology_Codec", + zap.String("marshaled_result", s)) + var topology2 IndexClusterTopology + err = UnmarshalTopology(s, &topology2) + assert.Equal(t, nil, err) + assert.Equal(t, topology1.Self, topology2.Self) + assert.Equal(t, len(topology1.ConnectedNodes), len(topology2.ConnectedNodes)) + for i := range topology1.ConnectedNodes { + assert.Equal(t, topology1.ConnectedNodes[i], topology2.ConnectedNodes[i]) + } +} + +func TestIndexCoordTopology_Codec(t *testing.T) { + topology1 := IndexCoordTopology{ + Cluster: IndexClusterTopology{ + Self: IndexCoordInfos{ + BaseComponentInfos: BaseComponentInfos{ + Name: ConstructComponentName(typeutil.IndexCoordRole, 1), + }, + }, + ConnectedNodes: []IndexNodeInfos{ + { + BaseComponentInfos: BaseComponentInfos{ + Name: ConstructComponentName(typeutil.IndexNodeRole, 1), + }, + }, + { + BaseComponentInfos: BaseComponentInfos{ + Name: ConstructComponentName(typeutil.IndexNodeRole, 2), + }, + }, + }, + }, + Connections: ConnTopology{ + Name: ConstructComponentName(typeutil.IndexCoordRole, 1), + ConnectedComponents: []string{ + ConstructComponentName(typeutil.RootCoordRole, 1), + }, + }, + } + s, err := MarshalTopology(topology1) + assert.Equal(t, nil, err) + log.Info("TestIndexCoordTopology_Codec", + zap.String("marshaled_result", s)) + var topology2 IndexCoordTopology + err = UnmarshalTopology(s, &topology2) + assert.Equal(t, nil, err) + assert.Equal(t, topology1.Cluster.Self, topology2.Cluster.Self) + assert.Equal(t, len(topology1.Cluster.ConnectedNodes), len(topology2.Cluster.ConnectedNodes)) + for i := range topology1.Cluster.ConnectedNodes { + assert.Equal(t, topology1.Cluster.ConnectedNodes[i], topology2.Cluster.ConnectedNodes[i]) + } + assert.Equal(t, topology1.Connections.Name, topology2.Connections.Name) + assert.Equal(t, len(topology1.Connections.ConnectedComponents), len(topology1.Connections.ConnectedComponents)) + for i := range topology1.Connections.ConnectedComponents { + assert.Equal(t, topology1.Connections.ConnectedComponents[i], topology2.Connections.ConnectedComponents[i]) + } +} + func TestConnTopology_Codec(t *testing.T) { topology1 := ConnTopology{ Name: ConstructComponentName(typeutil.ProxyRole, 1),