From ae3daff5e4979fc8a225a6049c203d608787772b Mon Sep 17 00:00:00 2001 From: Cai Yudong Date: Thu, 17 Jun 2021 16:47:57 +0800 Subject: [PATCH] Rename Master to RootCoord (#5830) * rename master_service.go to root_coordinator.go Signed-off-by: yudong.cai * rename service to coordinator under cmd Signed-off-by: yudong.cai * rename service to coord under cmd Signed-off-by: yudong.cai * rename service to coord for metrics Signed-off-by: yudong.cai * rename service to coord for masterservice Signed-off-by: yudong.cai * fix metrics unittest Signed-off-by: yudong.cai * roll back cmd Signed-off-by: yudong.cai --- cmd/distributed/roles/roles.go | 10 +- internal/dataservice/cluster_data_manager.go | 8 +- .../masterservice/client/client.go | 30 +-- .../masterservice/{server.go => service.go} | 96 ++++----- ...{masterservice_test.go => service_test.go} | 43 ++-- internal/masterservice/proxy_node_manager.go | 6 +- .../{master_service.go => root_coord.go} | 132 ++++++------ ...ter_service_test.go => root_coord_test.go} | 14 +- internal/masterservice/timestamp_test.go | 6 +- internal/masterservice/timeticksync.go | 2 +- internal/metrics/metrics.go | 203 +++++++++--------- internal/metrics/metrics_test.go | 10 +- internal/types/types.go | 6 +- internal/util/typeutil/type.go | 16 +- 14 files changed, 286 insertions(+), 296 deletions(-) rename internal/distributed/masterservice/{server.go => service.go} (76%) rename internal/distributed/masterservice/{masterservice_test.go => service_test.go} (96%) rename internal/masterservice/{master_service.go => root_coord.go} (91%) rename internal/masterservice/{master_service_test.go => root_coord_test.go} (99%) diff --git a/cmd/distributed/roles/roles.go b/cmd/distributed/roles/roles.go index 1a4f2ac8e4..cb01e5ad3e 100644 --- a/cmd/distributed/roles/roles.go +++ b/cmd/distributed/roles/roles.go @@ -97,7 +97,7 @@ func (mr *MilvusRoles) Run(localMsg bool) { defer ms.Stop() } - metrics.RegisterMaster() + metrics.RegisterRootCoord() } if mr.EnableProxyNode { @@ -153,7 +153,7 @@ func (mr *MilvusRoles) Run(localMsg bool) { defer qs.Stop() } - metrics.RegisterQueryService() + metrics.RegisterQueryCoord() } if mr.EnableQueryNode { @@ -209,7 +209,7 @@ func (mr *MilvusRoles) Run(localMsg bool) { defer ds.Stop() } - metrics.RegisterDataService() + metrics.RegisterDataCoord() } if mr.EnableDataNode { @@ -264,7 +264,7 @@ func (mr *MilvusRoles) Run(localMsg bool) { defer is.Stop() } - metrics.RegisterIndexService() + metrics.RegisterIndexCoord() } if mr.EnableIndexNode { @@ -314,7 +314,7 @@ func (mr *MilvusRoles) Run(localMsg bool) { defer mss.Stop() } - metrics.RegisterMsgStreamService() + metrics.RegisterMsgStreamCoord() } metrics.ServeHTTP() diff --git a/internal/dataservice/cluster_data_manager.go b/internal/dataservice/cluster_data_manager.go index 5a641e836d..65eedf1558 100644 --- a/internal/dataservice/cluster_data_manager.go +++ b/internal/dataservice/cluster_data_manager.go @@ -117,8 +117,8 @@ func (c *clusterNodeManager) updateCluster(dataNodes []*datapb.DataNodeInfo) *cl offlines = append(offlines, nAddr) } } - metrics.DataServiceDataNodeList.WithLabelValues("online").Set(onCnt) - metrics.DataServiceDataNodeList.WithLabelValues("offline").Set(offCnt) + metrics.DataCoordDataNodeList.WithLabelValues("online").Set(onCnt) + metrics.DataCoordDataNodeList.WithLabelValues("offline").Set(offCnt) return &clusterDeltaChange{ newNodes: newNodes, offlines: offlines, @@ -177,8 +177,8 @@ func (c *clusterNodeManager) updateMetrics() { offCnt++ } } - metrics.DataServiceDataNodeList.WithLabelValues("online").Set(onCnt) - metrics.DataServiceDataNodeList.WithLabelValues("offline").Set(offCnt) + metrics.DataCoordDataNodeList.WithLabelValues("online").Set(onCnt) + metrics.DataCoordDataNodeList.WithLabelValues("offline").Set(offCnt) } func (c *clusterNodeManager) txnSaveNodes(nodes []*datapb.DataNodeInfo, buffer []*datapb.ChannelStatus) error { diff --git a/internal/distributed/masterservice/client/client.go b/internal/distributed/masterservice/client/client.go index bab3b6d20b..2e55ae44ad 100644 --- a/internal/distributed/masterservice/client/client.go +++ b/internal/distributed/masterservice/client/client.go @@ -46,32 +46,32 @@ type GrpcClient struct { sess *sessionutil.Session } -func getMasterServiceAddr(sess *sessionutil.Session) (string, error) { - key := typeutil.MasterServiceRole +func getRootCoordAddr(sess *sessionutil.Session) (string, error) { + key := typeutil.RootCoordRole msess, _, err := sess.GetSessions(key) if err != nil { - log.Debug("MasterServiceClient GetSessions failed", zap.Any("key", key)) + log.Debug("RootCoordClient GetSessions failed", zap.Any("key", key)) return "", err } - log.Debug("MasterServiceClient GetSessions success") + log.Debug("RootCoordClient GetSessions success") ms, ok := msess[key] if !ok { - log.Debug("MasterServiceClient mess key not exist", zap.Any("key", key)) + log.Debug("RootCoordClient mess key not exist", zap.Any("key", key)) return "", fmt.Errorf("number of master service is incorrect, %d", len(msess)) } return ms.Address, nil } -// NewClient create master service client with specified ectd info and timeout +// NewClient create root coordinator client with specified ectd info and timeout // ctx execution control context -// metaRoot is the path in etcd for master registration +// metaRoot is the path in etcd for root coordinator registration // etcdEndpoints are the address list for etcd end points // timeout is default setting for each grpc call func NewClient(ctx context.Context, metaRoot string, etcdEndpoints []string, timeout time.Duration) (*GrpcClient, error) { sess := sessionutil.NewSession(ctx, metaRoot, etcdEndpoints) if sess == nil { err := fmt.Errorf("new session error, maybe can not connect to etcd") - log.Debug("MasterServiceClient NewClient failed", zap.Error(err)) + log.Debug("RootCoordClient NewClient failed", zap.Error(err)) return nil, err } @@ -88,11 +88,11 @@ func NewClient(ctx context.Context, metaRoot string, etcdEndpoints []string, tim func (c *GrpcClient) connect() error { var err error - getMasterServiceAddrFn := func() error { + getRootCoordAddrFn := func() error { ch := make(chan struct{}, 1) var err error go func() { - c.addr, err = getMasterServiceAddr(c.sess) + c.addr, err = getRootCoordAddr(c.sess) ch <- struct{}{} }() select { @@ -105,13 +105,13 @@ func (c *GrpcClient) connect() error { } return nil } - err = retry.Retry(c.reconnTry, 3*time.Second, getMasterServiceAddrFn) + err = retry.Retry(c.reconnTry, 3*time.Second, getRootCoordAddrFn) if err != nil { - log.Debug("MasterServiceClient getMasterServiceAddr failed", zap.Error(err)) + log.Debug("RootCoordClient getRootCoordAddr failed", zap.Error(err)) return err } connectGrpcFunc := func() error { - log.Debug("MasterServiceClient try reconnect ", zap.String("address", c.addr)) + log.Debug("RootCoordClient try reconnect ", zap.String("address", c.addr)) ctx, cancelFunc := context.WithTimeout(c.ctx, c.timeout) defer cancelFunc() var conn *grpc.ClientConn @@ -139,10 +139,10 @@ func (c *GrpcClient) connect() error { err = retry.Retry(c.reconnTry, 500*time.Millisecond, connectGrpcFunc) if err != nil { - log.Debug("MasterServiceClient try reconnect failed", zap.Error(err)) + log.Debug("RootCoordClient try reconnect failed", zap.Error(err)) return err } - log.Debug("MasterServiceClient try reconnect success") + log.Debug("RootCoordClient try reconnect success") c.grpcClient = masterpb.NewMasterServiceClient(c.conn) return nil } diff --git a/internal/distributed/masterservice/server.go b/internal/distributed/masterservice/service.go similarity index 76% rename from internal/distributed/masterservice/server.go rename to internal/distributed/masterservice/service.go index b1a7e82c44..0d4feeea16 100644 --- a/internal/distributed/masterservice/server.go +++ b/internal/distributed/masterservice/service.go @@ -44,9 +44,9 @@ import ( // Server grpc wrapper type Server struct { - masterService types.MasterComponent - grpcServer *grpc.Server - grpcErrChan chan error + rootCoord types.MasterComponent + grpcServer *grpc.Server + grpcErrChan chan error wg sync.WaitGroup @@ -57,9 +57,9 @@ type Server struct { indexService types.IndexService queryService types.QueryService - newIndexServiceClient func(string, []string, time.Duration) types.IndexService - newDataServiceClient func(string, []string, time.Duration) types.DataService - newQueryServiceClient func(string, []string, time.Duration) types.QueryService + newIndexCoordClient func(string, []string, time.Duration) types.IndexService + newDataCoordClient func(string, []string, time.Duration) types.DataService + newQueryCoordClient func(string, []string, time.Duration) types.QueryService closer io.Closer } @@ -73,7 +73,7 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) } s.setClient() var err error - s.masterService, err = cms.NewCore(s.ctx, factory) + s.rootCoord, err = cms.NewCore(s.ctx, factory) if err != nil { return nil, err } @@ -83,7 +83,7 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) func (s *Server) setClient() { ctx := context.Background() - s.newDataServiceClient = func(etcdMetaRoot string, etcdEndpoints []string, timeout time.Duration) types.DataService { + s.newDataCoordClient = func(etcdMetaRoot string, etcdEndpoints []string, timeout time.Duration) types.DataService { dsClient := dsc.NewClient(etcdMetaRoot, etcdEndpoints, timeout) if err := dsClient.Init(); err != nil { panic(err) @@ -96,7 +96,7 @@ func (s *Server) setClient() { } return dsClient } - s.newIndexServiceClient = func(metaRootPath string, etcdEndpoints []string, timeout time.Duration) types.IndexService { + s.newIndexCoordClient = func(metaRootPath string, etcdEndpoints []string, timeout time.Duration) types.IndexService { isClient := isc.NewClient(metaRootPath, etcdEndpoints, timeout) if err := isClient.Init(); err != nil { panic(err) @@ -106,7 +106,7 @@ func (s *Server) setClient() { } return isClient } - s.newQueryServiceClient = func(metaRootPath string, etcdEndpoints []string, timeout time.Duration) types.QueryService { + s.newQueryCoordClient = func(metaRootPath string, etcdEndpoints []string, timeout time.Duration) types.QueryService { qsClient, err := qsc.NewClient(metaRootPath, etcdEndpoints, timeout) if err != nil { panic(err) @@ -141,12 +141,12 @@ func (s *Server) init() error { ctx := context.Background() - closer := trace.InitTracing("master_service") + closer := trace.InitTracing("root_coord") s.closer = closer log.Debug("init params done") - err := s.masterService.Register() + err := s.rootCoord.Register() if err != nil { return err } @@ -156,9 +156,9 @@ func (s *Server) init() error { return err } - s.masterService.UpdateStateCode(internalpb.StateCode_Initializing) + s.rootCoord.UpdateStateCode(internalpb.StateCode_Initializing) log.Debug("MasterService", zap.Any("State", internalpb.StateCode_Initializing)) - s.masterService.SetNewProxyClient( + s.rootCoord.SetNewProxyClient( func(s *sessionutil.Session) (types.ProxyNode, error) { cli := pnc.NewClient(s.Address, 3*time.Second) if err := cli.Init(); err != nil { @@ -171,32 +171,32 @@ func (s *Server) init() error { }, ) - if s.newDataServiceClient != nil { + if s.newDataCoordClient != nil { log.Debug("MasterService start to create DataService client") - dataService := s.newDataServiceClient(cms.Params.MetaRootPath, cms.Params.EtcdEndpoints, 3*time.Second) - if err := s.masterService.SetDataService(ctx, dataService); err != nil { + dataService := s.newDataCoordClient(cms.Params.MetaRootPath, cms.Params.EtcdEndpoints, 3*time.Second) + if err := s.rootCoord.SetDataCoord(ctx, dataService); err != nil { panic(err) } s.dataService = dataService } - if s.newIndexServiceClient != nil { + if s.newIndexCoordClient != nil { log.Debug("MasterService start to create IndexService client") - indexService := s.newIndexServiceClient(cms.Params.MetaRootPath, cms.Params.EtcdEndpoints, 3*time.Second) - if err := s.masterService.SetIndexService(indexService); err != nil { + indexService := s.newIndexCoordClient(cms.Params.MetaRootPath, cms.Params.EtcdEndpoints, 3*time.Second) + if err := s.rootCoord.SetIndexCoord(indexService); err != nil { panic(err) } s.indexService = indexService } - if s.newQueryServiceClient != nil { + if s.newQueryCoordClient != nil { log.Debug("MasterService start to create QueryService client") - queryService := s.newQueryServiceClient(cms.Params.MetaRootPath, cms.Params.EtcdEndpoints, 3*time.Second) - if err := s.masterService.SetQueryService(queryService); err != nil { + queryService := s.newQueryCoordClient(cms.Params.MetaRootPath, cms.Params.EtcdEndpoints, 3*time.Second) + if err := s.rootCoord.SetQueryCoord(queryService); err != nil { panic(err) } s.queryService = queryService } - return s.masterService.Init() + return s.rootCoord.Init() } func (s *Server) startGrpc() error { @@ -241,7 +241,7 @@ func (s *Server) startGrpcLoop(grpcPort int) { func (s *Server) start() error { log.Debug("Master Core start ...") - if err := s.masterService.Start(); err != nil { + if err := s.rootCoord.Start(); err != nil { return err } return nil @@ -268,9 +268,9 @@ func (s *Server) Stop() error { log.Debug("close queryService client", zap.Error(err)) } } - if s.masterService != nil { - if err := s.masterService.Stop(); err != nil { - log.Debug("close masterService", zap.Error(err)) + if s.rootCoord != nil { + if err := s.rootCoord.Stop(); err != nil { + log.Debug("close rootCoord", zap.Error(err)) } } s.cancel() @@ -282,87 +282,87 @@ func (s *Server) Stop() error { } func (s *Server) GetComponentStates(ctx context.Context, req *internalpb.GetComponentStatesRequest) (*internalpb.ComponentStates, error) { - return s.masterService.GetComponentStates(ctx) + return s.rootCoord.GetComponentStates(ctx) } // GetTimeTickChannel receiver time tick from proxy service, and put it into this channel func (s *Server) GetTimeTickChannel(ctx context.Context, req *internalpb.GetTimeTickChannelRequest) (*milvuspb.StringResponse, error) { - return s.masterService.GetTimeTickChannel(ctx) + return s.rootCoord.GetTimeTickChannel(ctx) } // GetStatisticsChannel just define a channel, not used currently func (s *Server) GetStatisticsChannel(ctx context.Context, req *internalpb.GetStatisticsChannelRequest) (*milvuspb.StringResponse, error) { - return s.masterService.GetStatisticsChannel(ctx) + return s.rootCoord.GetStatisticsChannel(ctx) } //DDL request func (s *Server) CreateCollection(ctx context.Context, in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) { - return s.masterService.CreateCollection(ctx, in) + return s.rootCoord.CreateCollection(ctx, in) } func (s *Server) DropCollection(ctx context.Context, in *milvuspb.DropCollectionRequest) (*commonpb.Status, error) { - return s.masterService.DropCollection(ctx, in) + return s.rootCoord.DropCollection(ctx, in) } func (s *Server) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) { - return s.masterService.HasCollection(ctx, in) + return s.rootCoord.HasCollection(ctx, in) } func (s *Server) DescribeCollection(ctx context.Context, in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) { - return s.masterService.DescribeCollection(ctx, in) + return s.rootCoord.DescribeCollection(ctx, in) } func (s *Server) ShowCollections(ctx context.Context, in *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) { - return s.masterService.ShowCollections(ctx, in) + return s.rootCoord.ShowCollections(ctx, in) } func (s *Server) CreatePartition(ctx context.Context, in *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) { - return s.masterService.CreatePartition(ctx, in) + return s.rootCoord.CreatePartition(ctx, in) } func (s *Server) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequest) (*commonpb.Status, error) { - return s.masterService.DropPartition(ctx, in) + return s.rootCoord.DropPartition(ctx, in) } func (s *Server) HasPartition(ctx context.Context, in *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) { - return s.masterService.HasPartition(ctx, in) + return s.rootCoord.HasPartition(ctx, in) } func (s *Server) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) { - return s.masterService.ShowPartitions(ctx, in) + return s.rootCoord.ShowPartitions(ctx, in) } // CreateIndex index builder service func (s *Server) CreateIndex(ctx context.Context, in *milvuspb.CreateIndexRequest) (*commonpb.Status, error) { - return s.masterService.CreateIndex(ctx, in) + return s.rootCoord.CreateIndex(ctx, in) } func (s *Server) DropIndex(ctx context.Context, in *milvuspb.DropIndexRequest) (*commonpb.Status, error) { - return s.masterService.DropIndex(ctx, in) + return s.rootCoord.DropIndex(ctx, in) } func (s *Server) DescribeIndex(ctx context.Context, in *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) { - return s.masterService.DescribeIndex(ctx, in) + return s.rootCoord.DescribeIndex(ctx, in) } // AllocTimestamp global timestamp allocator func (s *Server) AllocTimestamp(ctx context.Context, in *masterpb.AllocTimestampRequest) (*masterpb.AllocTimestampResponse, error) { - return s.masterService.AllocTimestamp(ctx, in) + return s.rootCoord.AllocTimestamp(ctx, in) } func (s *Server) AllocID(ctx context.Context, in *masterpb.AllocIDRequest) (*masterpb.AllocIDResponse, error) { - return s.masterService.AllocID(ctx, in) + return s.rootCoord.AllocID(ctx, in) } // UpdateChannelTimeTick used to handle ChannelTimeTickMsg func (s *Server) UpdateChannelTimeTick(ctx context.Context, in *internalpb.ChannelTimeTickMsg) (*commonpb.Status, error) { - return s.masterService.UpdateChannelTimeTick(ctx, in) + return s.rootCoord.UpdateChannelTimeTick(ctx, in) } func (s *Server) DescribeSegment(ctx context.Context, in *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) { - return s.masterService.DescribeSegment(ctx, in) + return s.rootCoord.DescribeSegment(ctx, in) } func (s *Server) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsRequest) (*milvuspb.ShowSegmentsResponse, error) { - return s.masterService.ShowSegments(ctx, in) + return s.rootCoord.ShowSegments(ctx, in) } diff --git a/internal/distributed/masterservice/masterservice_test.go b/internal/distributed/masterservice/service_test.go similarity index 96% rename from internal/distributed/masterservice/masterservice_test.go rename to internal/distributed/masterservice/service_test.go index 786197cbd6..bb67f5a991 100644 --- a/internal/distributed/masterservice/masterservice_test.go +++ b/internal/distributed/masterservice/service_test.go @@ -138,7 +138,7 @@ func TestGrpcService(t *testing.T) { t.Logf("master service port = %d", Params.Port) - core, ok := (svr.masterService).(*cms.Core) + core, ok := (svr.rootCoord).(*cms.Core) assert.True(t, ok) err = core.Register() @@ -146,7 +146,7 @@ func TestGrpcService(t *testing.T) { err = svr.startGrpc() assert.Nil(t, err) - svr.masterService.UpdateStateCode(internalpb.StateCode_Initializing) + svr.rootCoord.UpdateStateCode(internalpb.StateCode_Initializing) etcdCli, err := initEtcd(cms.Params.EtcdEndpoints) assert.Nil(t, err) @@ -169,7 +169,7 @@ func TestGrpcService(t *testing.T) { FlushedSegmentChan := make(chan *msgstream.MsgPack, 8) core.DataNodeFlushedSegmentChan = FlushedSegmentChan SegmentInfoChan := make(chan *msgstream.MsgPack, 8) - core.DataServiceSegmentChan = SegmentInfoChan + core.DataCoordSegmentChan = SegmentInfoChan timeTickArray := make([]typeutil.Timestamp, 0, 16) core.SendTimeTick = func(ts typeutil.Timestamp) error { @@ -247,13 +247,13 @@ func TestGrpcService(t *testing.T) { } cms.Params.Address = Params.Address - err = svr.masterService.Register() + err = svr.rootCoord.Register() assert.Nil(t, err) err = svr.start() assert.Nil(t, err) - svr.masterService.UpdateStateCode(internalpb.StateCode_Healthy) + svr.rootCoord.UpdateStateCode(internalpb.StateCode_Healthy) cli, err := grpcmasterserviceclient.NewClient(context.Background(), cms.Params.MetaRootPath, cms.Params.EtcdEndpoints, 3*time.Second) assert.Nil(t, err) @@ -820,14 +820,15 @@ type mockCore struct { func (m *mockCore) UpdateStateCode(internalpb.StateCode) { } -func (m *mockCore) SetDataService(context.Context, types.DataService) error { + +func (m *mockCore) SetDataCoord(context.Context, types.DataService) error { return nil } -func (m *mockCore) SetIndexService(types.IndexService) error { +func (m *mockCore) SetIndexCoord(types.IndexService) error { return nil } -func (m *mockCore) SetQueryService(types.QueryService) error { +func (m *mockCore) SetQueryCoord(types.QueryService) error { return nil } @@ -850,17 +851,17 @@ func (m *mockCore) Stop() error { func (m *mockCore) SetNewProxyClient(func(sess *sessionutil.Session) (types.ProxyNode, error)) { } -type mockDataService struct { +type mockDataCoord struct { types.DataService } -func (m *mockDataService) Init() error { +func (m *mockDataCoord) Init() error { return nil } -func (m *mockDataService) Start() error { +func (m *mockDataCoord) Start() error { return nil } -func (m *mockDataService) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) { +func (m *mockDataCoord) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) { return &internalpb.ComponentStates{ State: &internalpb.ComponentInfo{ StateCode: internalpb.StateCode_Healthy, @@ -875,7 +876,7 @@ func (m *mockDataService) GetComponentStates(ctx context.Context) (*internalpb.C }, }, nil } -func (m *mockDataService) Stop() error { +func (m *mockDataCoord) Stop() error { return fmt.Errorf("stop error") } @@ -910,10 +911,10 @@ func (m *mockQuery) Stop() error { func TestRun(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) svr := Server{ - masterService: &mockCore{}, - ctx: ctx, - cancel: cancel, - grpcErrChan: make(chan error), + rootCoord: &mockCore{}, + ctx: ctx, + cancel: cancel, + grpcErrChan: make(chan error), } Params.Init() Params.Port = 1000000 @@ -921,13 +922,13 @@ func TestRun(t *testing.T) { assert.NotNil(t, err) assert.EqualError(t, err, "listen tcp: address 1000000: invalid port") - svr.newDataServiceClient = func(string, []string, time.Duration) types.DataService { - return &mockDataService{} + svr.newDataCoordClient = func(string, []string, time.Duration) types.DataService { + return &mockDataCoord{} } - svr.newIndexServiceClient = func(string, []string, time.Duration) types.IndexService { + svr.newIndexCoordClient = func(string, []string, time.Duration) types.IndexService { return &mockIndex{} } - svr.newQueryServiceClient = func(string, []string, time.Duration) types.QueryService { + svr.newQueryCoordClient = func(string, []string, time.Duration) types.QueryService { return &mockQuery{} } diff --git a/internal/masterservice/proxy_node_manager.go b/internal/masterservice/proxy_node_manager.go index 06b9192db9..7dc2b3f641 100644 --- a/internal/masterservice/proxy_node_manager.go +++ b/internal/masterservice/proxy_node_manager.go @@ -91,7 +91,7 @@ func (p *proxyNodeManager) WatchProxyNode() error { f(sessions) } for _, s := range sessions { - metrics.MasterProxyNodeLister.WithLabelValues(metricProxyNode(s.ServerID)).Set(1) + metrics.RootCoordProxyNodeLister.WithLabelValues(metricProxyNode(s.ServerID)).Set(1) } for _, s := range sessions { log.Debug("Get proxy node", zap.Int64("node id", s.ServerID), zap.String("node addr", s.Address), zap.String("node name", s.ServerName)) @@ -131,7 +131,7 @@ func (p *proxyNodeManager) WatchProxyNode() error { f(sess) } p.lock.Unlock() - metrics.MasterProxyNodeLister.WithLabelValues(metricProxyNode(sess.ServerID)).Set(1) + metrics.RootCoordProxyNodeLister.WithLabelValues(metricProxyNode(sess.ServerID)).Set(1) case mvccpb.DELETE: sess := new(sessionutil.Session) err := json.Unmarshal(ev.PrevKv.Value, sess) @@ -144,7 +144,7 @@ func (p *proxyNodeManager) WatchProxyNode() error { f(sess) } p.lock.Unlock() - metrics.MasterProxyNodeLister.WithLabelValues(metricProxyNode(sess.ServerID)).Set(0) + metrics.RootCoordProxyNodeLister.WithLabelValues(metricProxyNode(sess.ServerID)).Set(0) } } } diff --git a/internal/masterservice/master_service.go b/internal/masterservice/root_coord.go similarity index 91% rename from internal/masterservice/master_service.go rename to internal/masterservice/root_coord.go index 32ce0ab74a..fb10d96732 100644 --- a/internal/masterservice/master_service.go +++ b/internal/masterservice/root_coord.go @@ -76,20 +76,8 @@ func metricProxyNode(v int64) string { return fmt.Sprintf("client_%d", v) } -// Core master core +// Core root coordinator core type Core struct { - /* - ProxyServiceClient Interface: - get proxy service time tick channel,InvalidateCollectionMetaCache - - DataService Interface: - Segment States Channel, from DataService, if create new segment, data service should put the segment id into this channel, and let the master add the segment id to the collection meta - Segment Flush Watcher, monitor if segment has flushed into disk - - IndexService Interface - IndexService Sch, tell index service to build index - */ - MetaTable *metaTable //id allocator IDAllocator func(count uint32) (typeutil.UniqueID, typeutil.UniqueID, error) @@ -120,8 +108,8 @@ type Core struct { //setMsgStreams, send drop partition into dd channel SendDdDropPartitionReq func(ctx context.Context, req *internalpb.DropPartitionRequest, channelNames []string) error - // if master create segment, data service will put segment msg into this channel - DataServiceSegmentChan <-chan *ms.MsgPack + // if rootcoord create segment, datacoord will put segment msg into this channel + DataCoordSegmentChan <-chan *ms.MsgPack // if segment flush completed, data node would put segment msg into this channel DataNodeFlushedSegmentChan <-chan *ms.MsgPack @@ -244,8 +232,8 @@ func (c *Core) checkInit() error { if c.CallReleaseCollectionService == nil { return fmt.Errorf("CallReleaseCollectionService is nil") } - if c.DataServiceSegmentChan == nil { - return fmt.Errorf("DataServiceSegmentChan is nil") + if c.DataCoordSegmentChan == nil { + return fmt.Errorf("DataCoordSegmentChan is nil") } if c.DataNodeFlushedSegmentChan == nil { return fmt.Errorf("DataNodeFlushedSegmentChan is nil") @@ -277,7 +265,7 @@ func (c *Core) startTimeTickLoop() { for { select { case <-c.ctx.Done(): - log.Debug("master context closed", zap.Error(c.ctx.Err())) + log.Debug("rootcoord context closed", zap.Error(c.ctx.Err())) return case <-ticker.C: if len(c.ddReqQueue) < 2 || cnt > 5 { @@ -298,14 +286,14 @@ func (c *Core) startTimeTickLoop() { } } -// data service send segment info msg to master when create segment -func (c *Core) startDataServiceSegmentLoop() { +// datacoord send segment info msg to rootcoord when create segment +func (c *Core) startDataCoordSegmentLoop() { for { select { case <-c.ctx.Done(): log.Debug("close data service segment loop") return - case segMsg, ok := <-c.DataServiceSegmentChan: + case segMsg, ok := <-c.DataCoordSegmentChan: if !ok { log.Debug("data service segment channel is closed, exit loop") return @@ -338,7 +326,7 @@ func (c *Core) startDataServiceSegmentLoop() { } if _, err := c.MetaTable.AddSegment(segInfos, startPosStr, endPosStr); err != nil { - //what if master add segment failed, but data service success? + //what if rootcoord add segment failed, but datacoord success? log.Debug("add segment info meta table failed ", zap.String("error", err.Error())) continue } @@ -471,7 +459,7 @@ func (c *Core) sessionLoop() { return case _, ok := <-c.sessCloseCh: if !ok { - log.Error("master service disconnect with etcd, process will exit in 1 second") + log.Error("rootcoord disconnect with etcd, process will exit in 1 second") go func() { time.Sleep(time.Second) os.Exit(-1) @@ -537,13 +525,13 @@ func (c *Core) setMsgStreams() error { return fmt.Errorf("MsgChannelSubName is emptyr") } - // master time tick channel + // rootcoord time tick channel if Params.TimeTickChannel == "" { return fmt.Errorf("TimeTickChannel is empty") } timeTickStream, _ := c.msFactory.NewMsgStream(c.ctx) timeTickStream.AsProducer([]string{Params.TimeTickChannel}) - log.Debug("masterservice AsProducer: " + Params.TimeTickChannel) + log.Debug("rootcoord AsProducer: " + Params.TimeTickChannel) c.SendTimeTick = func(t typeutil.Timestamp) error { msgPack := ms.MsgPack{} @@ -568,7 +556,7 @@ func (c *Core) setMsgStreams() error { if err := timeTickStream.Broadcast(&msgPack); err != nil { return err } - metrics.MasterDDChannelTimeTick.Set(float64(tsoutil.Mod24H(t))) + metrics.RootCoordDDChannelTimeTick.Set(float64(tsoutil.Mod24H(t))) //c.dmlChannels.BroadcastAll(&msgPack) pc := c.MetaTable.ListCollectionPhysicalChannels() @@ -665,7 +653,7 @@ func (c *Core) setMsgStreams() error { if err != nil { return err } - c.DataServiceSegmentChan = (*dsStream).Chan() + c.DataCoordSegmentChan = (*dsStream).Chan() // data node will put msg into this channel when flush segment dnChanName := Params.DataServiceSegmentChannel @@ -688,7 +676,7 @@ func (c *Core) SetNewProxyClient(f func(sess *sessionutil.Session) (types.ProxyN } } -func (c *Core) SetDataService(ctx context.Context, s types.DataService) error { +func (c *Core) SetDataCoord(ctx context.Context, s types.DataService) error { rsp, err := s.GetSegmentInfoChannel(ctx) if err != nil { return err @@ -790,7 +778,7 @@ func (c *Core) SetDataService(ctx context.Context, s types.DataService) error { return nil } -func (c *Core) SetIndexService(s types.IndexService) error { +func (c *Core) SetIndexCoord(s types.IndexService) error { c.CallBuildIndexService = func(ctx context.Context, binlog []string, field *schemapb.FieldSchema, idxInfo *etcdpb.IndexInfo) (retID typeutil.UniqueID, retErr error) { defer func() { if err := recover(); err != nil { @@ -846,7 +834,7 @@ func (c *Core) SetIndexService(s types.IndexService) error { return nil } -func (c *Core) SetQueryService(s types.QueryService) error { +func (c *Core) SetQueryCoord(s types.QueryService) error { c.CallReleaseCollectionService = func(ctx context.Context, ts typeutil.Timestamp, dbID typeutil.UniqueID, collectionID typeutil.UniqueID) (retErr error) { defer func() { if err := recover(); err != nil { @@ -907,13 +895,13 @@ func (c *Core) BuildIndex(segID typeutil.UniqueID, field *schemapb.FieldSchema, return bldID, nil } -// Register register master service at etcd +// Register register rootcoord at etcd func (c *Core) Register() error { c.session = sessionutil.NewSession(c.ctx, Params.MetaRootPath, Params.EtcdEndpoints) if c.session == nil { return fmt.Errorf("session is nil, maybe the etcd client connection fails") } - c.sessCloseCh = c.session.Init(typeutil.MasterServiceRole, Params.Address, true) + c.sessCloseCh = c.session.Init(typeutil.RootCoordRole, Params.Address, true) return nil } @@ -1002,7 +990,7 @@ func (c *Core) Init() error { initError = c.setMsgStreams() }) if initError == nil { - log.Debug("Master service", zap.String("State Code", internalpb.StateCode_name[int32(internalpb.StateCode_Initializing)])) + log.Debug(typeutil.RootCoordRole, zap.String("State Code", internalpb.StateCode_name[int32(internalpb.StateCode_Initializing)])) } return initError } @@ -1125,32 +1113,32 @@ func (c *Core) reSendDdMsg(ctx context.Context) error { func (c *Core) Start() error { if err := c.checkInit(); err != nil { - log.Debug("MasterService Start checkInit failed", zap.Error(err)) + log.Debug("RootCoord Start checkInit failed", zap.Error(err)) return err } - log.Debug("MasterService", zap.Int64("node id", c.session.ServerID)) - log.Debug("MasterService", zap.String("time tick channel name", Params.TimeTickChannel)) + log.Debug(typeutil.RootCoordRole, zap.Int64("node id", c.session.ServerID)) + log.Debug(typeutil.RootCoordRole, zap.String("time tick channel name", Params.TimeTickChannel)) c.startOnce.Do(func() { if err := c.proxyNodeManager.WatchProxyNode(); err != nil { - log.Debug("MasterService Start WatchProxyNode failed", zap.Error(err)) + log.Debug("RootCoord Start WatchProxyNode failed", zap.Error(err)) return } if err := c.reSendDdMsg(c.ctx); err != nil { - log.Debug("MasterService Start reSendDdMsg failed", zap.Error(err)) + log.Debug("RootCoord Start reSendDdMsg failed", zap.Error(err)) return } go c.startDdScheduler() go c.startTimeTickLoop() - go c.startDataServiceSegmentLoop() + go c.startDataCoordSegmentLoop() go c.startDataNodeFlushedSegmentLoop() go c.tsLoop() go c.sessionLoop() go c.chanTimeTick.StartWatch() c.stateCode.Store(internalpb.StateCode_Healthy) }) - log.Debug("MasterService", zap.String("State Code", internalpb.StateCode_name[int32(internalpb.StateCode_Healthy)])) + log.Debug(typeutil.RootCoordRole, zap.String("State Code", internalpb.StateCode_name[int32(internalpb.StateCode_Healthy)])) return nil } @@ -1167,7 +1155,7 @@ func (c *Core) GetComponentStates(ctx context.Context) (*internalpb.ComponentSta return &internalpb.ComponentStates{ State: &internalpb.ComponentInfo{ NodeID: c.session.ServerID, - Role: typeutil.MasterServiceRole, + Role: typeutil.RootCoordRole, StateCode: code, ExtraInfo: nil, }, @@ -1178,7 +1166,7 @@ func (c *Core) GetComponentStates(ctx context.Context) (*internalpb.ComponentSta SubcomponentStates: []*internalpb.ComponentInfo{ { NodeID: c.session.ServerID, - Role: typeutil.MasterServiceRole, + Role: typeutil.RootCoordRole, StateCode: code, ExtraInfo: nil, }, @@ -1207,7 +1195,7 @@ func (c *Core) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringRespon } func (c *Core) CreateCollection(ctx context.Context, in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) { - metrics.MasterCreateCollectionCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsTotal).Inc() + metrics.RootCoordCreateCollectionCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsTotal).Inc() code := c.stateCode.Load().(internalpb.StateCode) if code != internalpb.StateCode_Healthy { return &commonpb.Status{ @@ -1234,7 +1222,7 @@ func (c *Core) CreateCollection(ctx context.Context, in *milvuspb.CreateCollecti }, nil } log.Debug("CreateCollection Success", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID)) - metrics.MasterCreateCollectionCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsSuccess).Inc() + metrics.RootCoordCreateCollectionCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsSuccess).Inc() return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, Reason: "", @@ -1242,7 +1230,7 @@ func (c *Core) CreateCollection(ctx context.Context, in *milvuspb.CreateCollecti } func (c *Core) DropCollection(ctx context.Context, in *milvuspb.DropCollectionRequest) (*commonpb.Status, error) { - metrics.MasterDropCollectionCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsTotal).Inc() + metrics.RootCoordDropCollectionCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsTotal).Inc() code := c.stateCode.Load().(internalpb.StateCode) if code != internalpb.StateCode_Healthy { return &commonpb.Status{ @@ -1269,7 +1257,7 @@ func (c *Core) DropCollection(ctx context.Context, in *milvuspb.DropCollectionRe }, nil } log.Debug("DropCollection Success", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID)) - metrics.MasterDropCollectionCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsSuccess).Inc() + metrics.RootCoordDropCollectionCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsSuccess).Inc() return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, Reason: "", @@ -1277,7 +1265,7 @@ func (c *Core) DropCollection(ctx context.Context, in *milvuspb.DropCollectionRe } func (c *Core) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) { - metrics.MasterHasCollectionCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsTotal).Inc() + metrics.RootCoordHasCollectionCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsTotal).Inc() code := c.stateCode.Load().(internalpb.StateCode) if code != internalpb.StateCode_Healthy { return &milvuspb.BoolResponse{ @@ -1311,7 +1299,7 @@ func (c *Core) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequ }, nil } log.Debug("HasCollection Success", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID)) - metrics.MasterHasCollectionCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsSuccess).Inc() + metrics.RootCoordHasCollectionCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsSuccess).Inc() return &milvuspb.BoolResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, @@ -1322,7 +1310,7 @@ func (c *Core) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequ } func (c *Core) DescribeCollection(ctx context.Context, in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) { - metrics.MasterDescribeCollectionCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsTotal).Inc() + metrics.RootCoordDescribeCollectionCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsTotal).Inc() code := c.stateCode.Load().(internalpb.StateCode) if code != internalpb.StateCode_Healthy { return &milvuspb.DescribeCollectionResponse{ @@ -1357,7 +1345,7 @@ func (c *Core) DescribeCollection(ctx context.Context, in *milvuspb.DescribeColl }, nil } log.Debug("DescribeCollection Success", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID)) - metrics.MasterDescribeCollectionCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsSuccess).Inc() + metrics.RootCoordDescribeCollectionCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsSuccess).Inc() t.Rsp.Status = &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, Reason: "", @@ -1367,7 +1355,7 @@ func (c *Core) DescribeCollection(ctx context.Context, in *milvuspb.DescribeColl } func (c *Core) ShowCollections(ctx context.Context, in *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) { - metrics.MasterShowCollectionsCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsTotal).Inc() + metrics.RootCoordShowCollectionsCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsTotal).Inc() code := c.stateCode.Load().(internalpb.StateCode) if code != internalpb.StateCode_Healthy { return &milvuspb.ShowCollectionsResponse{ @@ -1404,7 +1392,7 @@ func (c *Core) ShowCollections(ctx context.Context, in *milvuspb.ShowCollections }, nil } log.Debug("ShowCollections Success", zap.String("dbname", in.DbName), zap.Strings("collection names", t.Rsp.CollectionNames), zap.Int64("msgID", in.Base.MsgID)) - metrics.MasterShowCollectionsCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsSuccess).Inc() + metrics.RootCoordShowCollectionsCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsSuccess).Inc() t.Rsp.Status = &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, Reason: "", @@ -1413,7 +1401,7 @@ func (c *Core) ShowCollections(ctx context.Context, in *milvuspb.ShowCollections } func (c *Core) CreatePartition(ctx context.Context, in *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) { - metrics.MasterCreatePartitionCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsTotal).Inc() + metrics.RootCoordCreatePartitionCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsTotal).Inc() code := c.stateCode.Load().(internalpb.StateCode) if code != internalpb.StateCode_Healthy { return &commonpb.Status{ @@ -1440,7 +1428,7 @@ func (c *Core) CreatePartition(ctx context.Context, in *milvuspb.CreatePartition }, nil } log.Debug("CreatePartition Success", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID)) - metrics.MasterCreatePartitionCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsSuccess).Inc() + metrics.RootCoordCreatePartitionCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsSuccess).Inc() return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, Reason: "", @@ -1448,7 +1436,7 @@ func (c *Core) CreatePartition(ctx context.Context, in *milvuspb.CreatePartition } func (c *Core) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequest) (*commonpb.Status, error) { - metrics.MasterDropPartitionCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsTotal).Inc() + metrics.RootCoordDropPartitionCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsTotal).Inc() code := c.stateCode.Load().(internalpb.StateCode) if code != internalpb.StateCode_Healthy { return &commonpb.Status{ @@ -1475,7 +1463,7 @@ func (c *Core) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequ }, nil } log.Debug("DropPartition Success", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID)) - metrics.MasterDropPartitionCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsSuccess).Inc() + metrics.RootCoordDropPartitionCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsSuccess).Inc() return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, Reason: "", @@ -1483,7 +1471,7 @@ func (c *Core) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequ } func (c *Core) HasPartition(ctx context.Context, in *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) { - metrics.MasterHasPartitionCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsTotal).Inc() + metrics.RootCoordHasPartitionCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsTotal).Inc() code := c.stateCode.Load().(internalpb.StateCode) if code != internalpb.StateCode_Healthy { return &milvuspb.BoolResponse{ @@ -1517,7 +1505,7 @@ func (c *Core) HasPartition(ctx context.Context, in *milvuspb.HasPartitionReques }, nil } log.Debug("HasPartition Success", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID)) - metrics.MasterHasPartitionCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsSuccess).Inc() + metrics.RootCoordHasPartitionCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsSuccess).Inc() return &milvuspb.BoolResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, @@ -1528,17 +1516,17 @@ func (c *Core) HasPartition(ctx context.Context, in *milvuspb.HasPartitionReques } func (c *Core) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) { - metrics.MasterShowPartitionsCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsTotal).Inc() + metrics.RootCoordShowPartitionsCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsTotal).Inc() log.Debug("ShowPartitionRequest received", zap.String("role", Params.RoleName), zap.Int64("msgID", in.Base.MsgID), zap.String("collection", in.CollectionName)) code := c.stateCode.Load().(internalpb.StateCode) if code != internalpb.StateCode_Healthy { - log.Debug("ShowPartitionRequest failed: master is not healthy", zap.String("role", Params.RoleName), + log.Debug("ShowPartitionRequest failed: rootcoord is not healthy", zap.String("role", Params.RoleName), zap.Int64("msgID", in.Base.MsgID), zap.String("state", internalpb.StateCode_name[int32(code)])) return &milvuspb.ShowPartitionsResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: fmt.Sprintf("master is not healthy, state code = %s", internalpb.StateCode_name[int32(code)]), + Reason: fmt.Sprintf("rootcoord is not healthy, state code = %s", internalpb.StateCode_name[int32(code)]), }, PartitionNames: nil, PartitionIDs: nil, @@ -1571,7 +1559,7 @@ func (c *Core) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionsRe log.Debug("ShowPartitions succeed", zap.String("role", Params.RoleName), zap.Int64("msgID", t.Req.Base.MsgID), zap.String("collection name", in.CollectionName), zap.Strings("partition names", t.Rsp.PartitionNames), zap.Int64s("partition ids", t.Rsp.PartitionIDs)) - metrics.MasterShowPartitionsCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsSuccess).Inc() + metrics.RootCoordShowPartitionsCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsSuccess).Inc() t.Rsp.Status = &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, Reason: "", @@ -1580,7 +1568,7 @@ func (c *Core) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionsRe } func (c *Core) CreateIndex(ctx context.Context, in *milvuspb.CreateIndexRequest) (*commonpb.Status, error) { - metrics.MasterCreateIndexCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsTotal).Inc() + metrics.RootCoordCreateIndexCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsTotal).Inc() code := c.stateCode.Load().(internalpb.StateCode) if code != internalpb.StateCode_Healthy { return &commonpb.Status{ @@ -1607,7 +1595,7 @@ func (c *Core) CreateIndex(ctx context.Context, in *milvuspb.CreateIndexRequest) }, nil } log.Debug("CreateIndex Success", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.Int64("msgID", in.Base.MsgID)) - metrics.MasterCreateIndexCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsSuccess).Inc() + metrics.RootCoordCreateIndexCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsSuccess).Inc() return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, Reason: "", @@ -1615,7 +1603,7 @@ func (c *Core) CreateIndex(ctx context.Context, in *milvuspb.CreateIndexRequest) } func (c *Core) DescribeIndex(ctx context.Context, in *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) { - metrics.MasterDescribeIndexCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsTotal).Inc() + metrics.RootCoordDescribeIndexCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsTotal).Inc() code := c.stateCode.Load().(internalpb.StateCode) if code != internalpb.StateCode_Healthy { return &milvuspb.DescribeIndexResponse{ @@ -1656,7 +1644,7 @@ func (c *Core) DescribeIndex(ctx context.Context, in *milvuspb.DescribeIndexRequ idxNames = append(idxNames, i.IndexName) } log.Debug("DescribeIndex Success", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.Strings("index names", idxNames), zap.Int64("msgID", in.Base.MsgID)) - metrics.MasterDescribeIndexCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsSuccess).Inc() + metrics.RootCoordDescribeIndexCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsSuccess).Inc() if len(t.Rsp.IndexDescriptions) == 0 { t.Rsp.Status = &commonpb.Status{ ErrorCode: commonpb.ErrorCode_IndexNotExist, @@ -1672,7 +1660,7 @@ func (c *Core) DescribeIndex(ctx context.Context, in *milvuspb.DescribeIndexRequ } func (c *Core) DropIndex(ctx context.Context, in *milvuspb.DropIndexRequest) (*commonpb.Status, error) { - metrics.MasterDropIndexCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsTotal).Inc() + metrics.RootCoordDropIndexCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsTotal).Inc() code := c.stateCode.Load().(internalpb.StateCode) if code != internalpb.StateCode_Healthy { return &commonpb.Status{ @@ -1699,7 +1687,7 @@ func (c *Core) DropIndex(ctx context.Context, in *milvuspb.DropIndexRequest) (*c }, nil } log.Debug("DropIndex Success", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.String("index name", in.IndexName), zap.Int64("msgID", in.Base.MsgID)) - metrics.MasterDropIndexCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsSuccess).Inc() + metrics.RootCoordDropIndexCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsSuccess).Inc() return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, Reason: "", @@ -1707,7 +1695,7 @@ func (c *Core) DropIndex(ctx context.Context, in *milvuspb.DropIndexRequest) (*c } func (c *Core) DescribeSegment(ctx context.Context, in *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) { - metrics.MasterDescribeSegmentCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsTotal).Inc() + metrics.RootCoordDescribeSegmentCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsTotal).Inc() code := c.stateCode.Load().(internalpb.StateCode) if code != internalpb.StateCode_Healthy { return &milvuspb.DescribeSegmentResponse{ @@ -1744,7 +1732,7 @@ func (c *Core) DescribeSegment(ctx context.Context, in *milvuspb.DescribeSegment }, nil } log.Debug("DescribeSegment Success", zap.Int64("collection id", in.CollectionID), zap.Int64("segment id", in.SegmentID), zap.Int64("msgID", in.Base.MsgID)) - metrics.MasterDescribeSegmentCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsSuccess).Inc() + metrics.RootCoordDescribeSegmentCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsSuccess).Inc() t.Rsp.Status = &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, Reason: "", @@ -1753,7 +1741,7 @@ func (c *Core) DescribeSegment(ctx context.Context, in *milvuspb.DescribeSegment } func (c *Core) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsRequest) (*milvuspb.ShowSegmentsResponse, error) { - metrics.MasterShowSegmentsCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsTotal).Inc() + metrics.RootCoordShowSegmentsCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsTotal).Inc() code := c.stateCode.Load().(internalpb.StateCode) if code != internalpb.StateCode_Healthy { return &milvuspb.ShowSegmentsResponse{ @@ -1790,7 +1778,7 @@ func (c *Core) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsReques }, nil } log.Debug("ShowSegments Success", zap.Int64("collection id", in.CollectionID), zap.Int64("partition id", in.PartitionID), zap.Int64s("segments ids", t.Rsp.SegmentIDs), zap.Int64("msgID", in.Base.MsgID)) - metrics.MasterShowSegmentsCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsSuccess).Inc() + metrics.RootCoordShowSegmentsCounter.WithLabelValues(metricProxyNode(in.Base.SourceID), MetricRequestsSuccess).Inc() t.Rsp.Status = &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, Reason: "", diff --git a/internal/masterservice/master_service_test.go b/internal/masterservice/root_coord_test.go similarity index 99% rename from internal/masterservice/master_service_test.go rename to internal/masterservice/root_coord_test.go index 46e35bb8d2..844003c265 100644 --- a/internal/masterservice/master_service_test.go +++ b/internal/masterservice/root_coord_test.go @@ -307,7 +307,7 @@ func TestMasterService(t *testing.T) { } dm := &dataMock{randVal: randVal} - err = core.SetDataService(ctx, dm) + err = core.SetDataCoord(ctx, dm) assert.Nil(t, err) im := &indexMock{ @@ -317,14 +317,14 @@ func TestMasterService(t *testing.T) { idxDropID: []int64{}, mutex: sync.Mutex{}, } - err = core.SetIndexService(im) + err = core.SetIndexCoord(im) assert.Nil(t, err) qm := &queryMock{ collID: nil, mutex: sync.Mutex{}, } - err = core.SetQueryService(qm) + err = core.SetQueryCoord(qm) assert.Nil(t, err) tmpFactory := msgstream.NewPmsFactory() @@ -1763,7 +1763,7 @@ func TestMasterService2(t *testing.T) { assert.Nil(t, err) dm := &dataMock{randVal: randVal} - err = core.SetDataService(ctx, dm) + err = core.SetDataCoord(ctx, dm) assert.Nil(t, err) im := &indexMock{ @@ -1773,14 +1773,14 @@ func TestMasterService2(t *testing.T) { idxDropID: []int64{}, mutex: sync.Mutex{}, } - err = core.SetIndexService(im) + err = core.SetIndexCoord(im) assert.Nil(t, err) qm := &queryMock{ collID: nil, mutex: sync.Mutex{}, } - err = core.SetQueryService(qm) + err = core.SetQueryCoord(qm) assert.Nil(t, err) core.NewProxyClient = func(*sessionutil.Session) (types.ProxyNode, error) { @@ -1966,7 +1966,7 @@ func TestCheckInit(t *testing.T) { err = c.checkInit() assert.NotNil(t, err) - c.DataServiceSegmentChan = make(chan *msgstream.MsgPack) + c.DataCoordSegmentChan = make(chan *msgstream.MsgPack) err = c.checkInit() assert.NotNil(t, err) diff --git a/internal/masterservice/timestamp_test.go b/internal/masterservice/timestamp_test.go index 520d486580..35dc53aa23 100644 --- a/internal/masterservice/timestamp_test.go +++ b/internal/masterservice/timestamp_test.go @@ -89,13 +89,13 @@ func BenchmarkAllocTimestamp(b *testing.B) { Params.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.KvRootPath) Params.MsgChannelSubName = fmt.Sprintf("subname-%d", randVal) - err = core.SetDataService(ctx, &tbd{}) + err = core.SetDataCoord(ctx, &tbd{}) assert.Nil(b, err) - err = core.SetIndexService(&tbi{}) + err = core.SetIndexCoord(&tbi{}) assert.Nil(b, err) - err = core.SetQueryService(&tbq{}) + err = core.SetQueryCoord(&tbq{}) assert.Nil(b, err) err = core.Register() diff --git a/internal/masterservice/timeticksync.go b/internal/masterservice/timeticksync.go index 18cb2ccd98..b3bd0af4e5 100644 --- a/internal/masterservice/timeticksync.go +++ b/internal/masterservice/timeticksync.go @@ -197,7 +197,7 @@ func (t *timetickSync) SendChannelTimeTick(chanName string, ts typeutil.Timestam err := t.core.dmlChannels.Broadcast(chanName, &msgPack) if err == nil { - metrics.MasterInsertChannelTimeTick.WithLabelValues(chanName).Set(float64(tsoutil.Mod24H(ts))) + metrics.RootCoordInsertChannelTimeTick.WithLabelValues(chanName).Set(float64(tsoutil.Mod24H(ts))) } return err } diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 8385f4727a..862ac6dec6 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -11,8 +11,9 @@ import ( ) const ( - milvusNamespace = `milvus` - subSystemDataService = `dataservice` + milvusNamespace = "milvus" + subSystemRootCoord = "rootcoord" + subSystemDataCoord = "dataCoord" ) /* @@ -28,11 +29,11 @@ var ( */ var ( - // MasterProxyNodeLister used to count the num of registered proxy nodes - MasterProxyNodeLister = prometheus.NewGaugeVec( + // RootCoordProxyNodeLister used to count the num of registered proxy nodes + RootCoordProxyNodeLister = prometheus.NewGaugeVec( prometheus.GaugeOpts{ - Namespace: "milvus", - Subsystem: "master", + Namespace: milvusNamespace, + Subsystem: subSystemRootCoord, Name: "list_of_proxy_node", Help: "List of proxy nodes which has register with etcd", }, []string{"client_id"}) @@ -40,128 +41,128 @@ var ( //////////////////////////////////////////////////////////////////////////// // for grpc - // MasterCreateCollectionCounter used to count the num of calls of CreateCollection - MasterCreateCollectionCounter = prometheus.NewCounterVec( + // RootCoordCreateCollectionCounter used to count the num of calls of CreateCollection + RootCoordCreateCollectionCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ - Namespace: "milvus", - Subsystem: "master", + Namespace: milvusNamespace, + Subsystem: subSystemRootCoord, Name: "create_collection_total", Help: "Counter of create collection", }, []string{"client_id", "type"}) - // MasterDropCollectionCounter used to count the num of calls of DropCollection - MasterDropCollectionCounter = prometheus.NewCounterVec( + // RootCoordDropCollectionCounter used to count the num of calls of DropCollection + RootCoordDropCollectionCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ - Namespace: "milvus", - Subsystem: "master", + Namespace: milvusNamespace, + Subsystem: subSystemRootCoord, Name: "drop_collection_total", Help: "Counter of drop collection", }, []string{"client_id", "type"}) - // MasterHasCollectionCounter used to count the num of calls of HasCollection - MasterHasCollectionCounter = prometheus.NewCounterVec( + // RootCoordHasCollectionCounter used to count the num of calls of HasCollection + RootCoordHasCollectionCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ - Namespace: "milvus", - Subsystem: "master", + Namespace: milvusNamespace, + Subsystem: subSystemRootCoord, Name: "has_collection_total", Help: "Counter of has collection", }, []string{"client_id", "type"}) - // MasterDescribeCollectionCounter used to count the num of calls of DescribeCollection - MasterDescribeCollectionCounter = prometheus.NewCounterVec( + // RootCoordDescribeCollectionCounter used to count the num of calls of DescribeCollection + RootCoordDescribeCollectionCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ - Namespace: "milvus", - Subsystem: "master", + Namespace: milvusNamespace, + Subsystem: subSystemRootCoord, Name: "describe_collection_total", Help: "Counter of describe collection", }, []string{"client_id", "type"}) - // MasterShowCollectionsCounter used to count the num of calls of ShowCollections - MasterShowCollectionsCounter = prometheus.NewCounterVec( + // RootCoordShowCollectionsCounter used to count the num of calls of ShowCollections + RootCoordShowCollectionsCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ - Namespace: "milvus", - Subsystem: "master", + Namespace: milvusNamespace, + Subsystem: subSystemRootCoord, Name: "show_collections_total", Help: "Counter of show collections", }, []string{"client_id", "type"}) - // MasterCreatePartitionCounter used to count the num of calls of CreatePartition - MasterCreatePartitionCounter = prometheus.NewCounterVec( + // RootCoordCreatePartitionCounter used to count the num of calls of CreatePartition + RootCoordCreatePartitionCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ - Namespace: "milvus", - Subsystem: "master", + Namespace: milvusNamespace, + Subsystem: subSystemRootCoord, Name: "create_partition_total", Help: "Counter of create partition", }, []string{"client_id", "type"}) - // MasterDropPartitionCounter used to count the num of calls of DropPartition - MasterDropPartitionCounter = prometheus.NewCounterVec( + // RootCoordDropPartitionCounter used to count the num of calls of DropPartition + RootCoordDropPartitionCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ - Namespace: "milvus", - Subsystem: "master", + Namespace: milvusNamespace, + Subsystem: subSystemRootCoord, Name: "drop_partition_total", Help: "Counter of drop partition", }, []string{"client_id", "type"}) - // MasterHasPartitionCounter used to count the num of calls of HasPartition - MasterHasPartitionCounter = prometheus.NewCounterVec( + // RootCoordHasPartitionCounter used to count the num of calls of HasPartition + RootCoordHasPartitionCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ - Namespace: "milvus", - Subsystem: "master", + Namespace: milvusNamespace, + Subsystem: subSystemRootCoord, Name: "has_partition_total", Help: "Counter of has partition", }, []string{"client_id", "type"}) - // MasterShowPartitionsCounter used to count the num of calls of ShowPartitions - MasterShowPartitionsCounter = prometheus.NewCounterVec( + // RootCoordShowPartitionsCounter used to count the num of calls of ShowPartitions + RootCoordShowPartitionsCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ - Namespace: "milvus", - Subsystem: "master", + Namespace: milvusNamespace, + Subsystem: subSystemRootCoord, Name: "show_partitions_total", Help: "Counter of show partitions", }, []string{"client_id", "type"}) - // MasterCreateIndexCounter used to count the num of calls of CreateIndex - MasterCreateIndexCounter = prometheus.NewCounterVec( + // RootCoordCreateIndexCounter used to count the num of calls of CreateIndex + RootCoordCreateIndexCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ - Namespace: "milvus", - Subsystem: "master", + Namespace: milvusNamespace, + Subsystem: subSystemRootCoord, Name: "create_index_total", Help: "Counter of create index", }, []string{"client_id", "type"}) - // MasterDropIndexCounter used to count the num of calls of DropIndex - MasterDropIndexCounter = prometheus.NewCounterVec( + // RootCoordDropIndexCounter used to count the num of calls of DropIndex + RootCoordDropIndexCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ - Namespace: "milvus", - Subsystem: "master", + Namespace: milvusNamespace, + Subsystem: subSystemRootCoord, Name: "drop_index_total", Help: "Counter of drop index", }, []string{"client_id", "type"}) - // MasterDescribeIndexCounter used to count the num of calls of DescribeIndex - MasterDescribeIndexCounter = prometheus.NewCounterVec( + // RootCoordDescribeIndexCounter used to count the num of calls of DescribeIndex + RootCoordDescribeIndexCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ - Namespace: "milvus", - Subsystem: "master", + Namespace: milvusNamespace, + Subsystem: subSystemRootCoord, Name: "describe_index_total", Help: "Counter of describe index", }, []string{"client_id", "type"}) - // MasterDescribeSegmentCounter used to count the num of calls of DescribeSegment - MasterDescribeSegmentCounter = prometheus.NewCounterVec( + // RootCoordDescribeSegmentCounter used to count the num of calls of DescribeSegment + RootCoordDescribeSegmentCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ - Namespace: "milvus", - Subsystem: "master", + Namespace: milvusNamespace, + Subsystem: subSystemRootCoord, Name: "describe_segment_total", Help: "Counter of describe segment", }, []string{"client_id", "type"}) - // MasterShowSegmentsCounter used to count the num of calls of ShowSegments - MasterShowSegmentsCounter = prometheus.NewCounterVec( + // RootCoordShowSegmentsCounter used to count the num of calls of ShowSegments + RootCoordShowSegmentsCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ - Namespace: "milvus", - Subsystem: "master", + Namespace: milvusNamespace, + Subsystem: subSystemRootCoord, Name: "show_segments_total", Help: "Counter of show segments", }, []string{"client_id", "type"}) @@ -169,48 +170,48 @@ var ( //////////////////////////////////////////////////////////////////////////// // for time tick - // MasterInsertChannelTimeTick used to count the time tick num of insert channel in 24H - MasterInsertChannelTimeTick = prometheus.NewGaugeVec( + // RootCoordInsertChannelTimeTick used to count the time tick num of insert channel in 24H + RootCoordInsertChannelTimeTick = prometheus.NewGaugeVec( prometheus.GaugeOpts{ - Namespace: "milvus", - Subsystem: "master", + Namespace: milvusNamespace, + Subsystem: subSystemRootCoord, Name: "insert_channel_time_tick", Help: "Time tick of insert Channel in 24H", }, []string{"vchannel"}) - // MasterDDChannelTimeTick used to count the time tick num of dd channel in 24H - MasterDDChannelTimeTick = prometheus.NewGauge( + // RootCoordDDChannelTimeTick used to count the time tick num of dd channel in 24H + RootCoordDDChannelTimeTick = prometheus.NewGauge( prometheus.GaugeOpts{ - Namespace: "milvus", - Subsystem: "master", + Namespace: milvusNamespace, + Subsystem: subSystemRootCoord, Name: "dd_channel_time_tick", Help: "Time tick of dd Channel in 24H", }) ) -//RegisterMaster register Master metrics -func RegisterMaster() { - prometheus.MustRegister(MasterProxyNodeLister) +//RegisterRootCoord register RootCoord metrics +func RegisterRootCoord() { + prometheus.MustRegister(RootCoordProxyNodeLister) // for grpc - prometheus.MustRegister(MasterCreateCollectionCounter) - prometheus.MustRegister(MasterDropCollectionCounter) - prometheus.MustRegister(MasterHasCollectionCounter) - prometheus.MustRegister(MasterDescribeCollectionCounter) - prometheus.MustRegister(MasterShowCollectionsCounter) - prometheus.MustRegister(MasterCreatePartitionCounter) - prometheus.MustRegister(MasterDropPartitionCounter) - prometheus.MustRegister(MasterHasPartitionCounter) - prometheus.MustRegister(MasterShowPartitionsCounter) - prometheus.MustRegister(MasterCreateIndexCounter) - prometheus.MustRegister(MasterDropIndexCounter) - prometheus.MustRegister(MasterDescribeIndexCounter) - prometheus.MustRegister(MasterDescribeSegmentCounter) - prometheus.MustRegister(MasterShowSegmentsCounter) + prometheus.MustRegister(RootCoordCreateCollectionCounter) + prometheus.MustRegister(RootCoordDropCollectionCounter) + prometheus.MustRegister(RootCoordHasCollectionCounter) + prometheus.MustRegister(RootCoordDescribeCollectionCounter) + prometheus.MustRegister(RootCoordShowCollectionsCounter) + prometheus.MustRegister(RootCoordCreatePartitionCounter) + prometheus.MustRegister(RootCoordDropPartitionCounter) + prometheus.MustRegister(RootCoordHasPartitionCounter) + prometheus.MustRegister(RootCoordShowPartitionsCounter) + prometheus.MustRegister(RootCoordCreateIndexCounter) + prometheus.MustRegister(RootCoordDropIndexCounter) + prometheus.MustRegister(RootCoordDescribeIndexCounter) + prometheus.MustRegister(RootCoordDescribeSegmentCounter) + prometheus.MustRegister(RootCoordShowSegmentsCounter) // for time tick - prometheus.MustRegister(MasterInsertChannelTimeTick) - prometheus.MustRegister(MasterDDChannelTimeTick) + prometheus.MustRegister(RootCoordInsertChannelTimeTick) + prometheus.MustRegister(RootCoordDDChannelTimeTick) //prometheus.MustRegister(PanicCounter) } @@ -219,8 +220,8 @@ func RegisterProxyNode() { } -//RegisterQueryService register QueryService metrics -func RegisterQueryService() { +//RegisterQueryCoord register QueryCoord metrics +func RegisterQueryCoord() { } @@ -230,20 +231,20 @@ func RegisterQueryNode() { } var ( - //DataServiceDataNodeList records the num of regsitered data nodes - DataServiceDataNodeList = prometheus.NewGaugeVec( + //DataCoordDataNodeList records the num of regsitered data nodes + DataCoordDataNodeList = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: milvusNamespace, - Subsystem: subSystemDataService, + Subsystem: subSystemDataCoord, Name: "list_of_data_node", Help: "List of data nodes regsitered within etcd", }, []string{"status"}, ) ) -//RegisterDataService register DataService metrics -func RegisterDataService() { - prometheus.Register(DataServiceDataNodeList) +//RegisterDataCoord register DataService metrics +func RegisterDataCoord() { + prometheus.Register(DataCoordDataNodeList) } //RegisterDataNode register DataNode metrics @@ -251,8 +252,8 @@ func RegisterDataNode() { } -//RegisterIndexService register IndexService metrics -func RegisterIndexService() { +//RegisterIndexCoord register IndexCoord metrics +func RegisterIndexCoord() { } @@ -261,8 +262,8 @@ func RegisterIndexNode() { } -//RegisterMsgStreamService register MsgStreamService metrics -func RegisterMsgStreamService() { +//RegisterMsgStreamCoord register MsgStreamCoord metrics +func RegisterMsgStreamCoord() { } diff --git a/internal/metrics/metrics_test.go b/internal/metrics/metrics_test.go index 48601350b0..7bb7472a7f 100644 --- a/internal/metrics/metrics_test.go +++ b/internal/metrics/metrics_test.go @@ -6,13 +6,13 @@ import ( func TestRegisterMetrics(t *testing.T) { // Make sure it doesn't panic. - RegisterMaster() + RegisterRootCoord() RegisterDataNode() - RegisterDataService() + RegisterDataCoord() RegisterIndexNode() - RegisterIndexService() + RegisterIndexCoord() RegisterProxyNode() RegisterQueryNode() - RegisterQueryService() - RegisterMsgStreamService() + RegisterQueryCoord() + RegisterMsgStreamCoord() } diff --git a/internal/types/types.go b/internal/types/types.go index 87d9875ba3..9b7c8fe3db 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -118,9 +118,9 @@ type MasterComponent interface { MasterService UpdateStateCode(internalpb.StateCode) - SetDataService(context.Context, DataService) error - SetIndexService(IndexService) error - SetQueryService(QueryService) error + SetDataCoord(context.Context, DataService) error + SetIndexCoord(IndexService) error + SetQueryCoord(QueryService) error SetNewProxyClient(func(sess *sessionutil.Session) (ProxyNode, error)) } diff --git a/internal/util/typeutil/type.go b/internal/util/typeutil/type.go index 0407059487..52bae5971c 100644 --- a/internal/util/typeutil/type.go +++ b/internal/util/typeutil/type.go @@ -16,12 +16,12 @@ type IntPrimaryKey = int64 type UniqueID = int64 const ( - MasterServiceRole = "MasterService" - ProxyNodeRole = "ProxyNode" - QueryServiceRole = "QueryService" - QueryNodeRole = "QueryNode" - IndexServiceRole = "IndexService" - IndexNodeRole = "IndexNode" - DataServiceRole = "DataService" - DataNodeRole = "DataNode" + RootCoordRole = "RootCoord" + ProxyNodeRole = "ProxyNode" + QueryServiceRole = "QueryService" + QueryNodeRole = "QueryNode" + IndexServiceRole = "IndexService" + IndexNodeRole = "IndexNode" + DataServiceRole = "DataService" + DataNodeRole = "DataNode" )