diff --git a/internal/datacoord/metrics_info.go b/internal/datacoord/metrics_info.go index fec0ff94f2..9a5b6a77b1 100644 --- a/internal/datacoord/metrics_info.go +++ b/internal/datacoord/metrics_info.go @@ -123,7 +123,7 @@ func (s *Server) getDataCoordMetrics() metricsinfo.DataCoordInfos { CreatedTime: paramtable.GetCreateTime().String(), UpdatedTime: paramtable.GetUpdateTime().String(), Type: typeutil.DataCoordRole, - ID: s.session.ServerID, + ID: paramtable.GetNodeID(), }, SystemConfigurations: metricsinfo.DataCoordConfiguration{ SegmentMaxSize: Params.DataCoordCfg.SegmentMaxSize.GetAsFloat(), diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index ee519ede46..ba59f33d45 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -250,7 +250,7 @@ func (s *Server) Register() error { } } go s.session.LivenessCheck(s.serverLoopCtx, func() { - logutil.Logger(s.ctx).Error("disconnected from etcd and exited", zap.Int64("serverID", s.session.ServerID)) + logutil.Logger(s.ctx).Error("disconnected from etcd and exited", zap.Int64("serverID", paramtable.GetNodeID())) if err := s.Stop(); err != nil { logutil.Logger(s.ctx).Fatal("failed to stop server", zap.Error(err)) } @@ -735,7 +735,7 @@ func (s *Server) startWatchService(ctx context.Context) { func (s *Server) stopServiceWatch() { // ErrCompacted is handled inside SessionWatcher, which means there is some other error occurred, closing server. - logutil.Logger(s.ctx).Error("watch service channel closed", zap.Int64("serverID", s.session.ServerID)) + logutil.Logger(s.ctx).Error("watch service channel closed", zap.Int64("serverID", paramtable.GetNodeID())) go s.Stop() if s.session.TriggerKill { if p, err := os.FindProcess(os.Getpid()); err == nil { diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 3151395be8..06be2d75ed 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -23,6 +23,8 @@ import ( "strconv" "sync" + "github.com/milvus-io/milvus/internal/common" + "github.com/samber/lo" "go.opentelemetry.io/otel" "go.uber.org/zap" @@ -31,7 +33,6 @@ import ( "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus-proto/go-api/milvuspb" "github.com/milvus-io/milvus-proto/go-api/msgpb" - "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" @@ -588,11 +589,11 @@ func (s *Server) GetStateCode() commonpb.StateCode { // GetComponentStates returns DataCoord's current state func (s *Server) GetComponentStates(ctx context.Context) (*milvuspb.ComponentStates, error) { + code := s.GetStateCode() nodeID := common.NotRegisteredID if s.session != nil && s.session.Registered() { nodeID = s.session.ServerID // or Params.NodeID } - code := s.GetStateCode() resp := &milvuspb.ComponentStates{ State: &milvuspb.ComponentInfo{ // NodeID: Params.NodeID, // will race with Server.Register() @@ -1432,7 +1433,7 @@ func (s *Server) BroadcastAlteredCollection(ctx context.Context, req *datapb.Alt func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { if s.isClosed() { - reason := errorutil.UnHealthReason("datacoord", s.session.ServerID, "datacoord is closed") + reason := errorutil.UnHealthReason("datacoord", paramtable.GetNodeID(), "datacoord is closed") return &milvuspb.CheckHealthResponse{IsHealthy: false, Reasons: []string{reason}}, nil } diff --git a/internal/querycoordv2/handlers.go b/internal/querycoordv2/handlers.go index c42deab5ff..56da270ae0 100644 --- a/internal/querycoordv2/handlers.go +++ b/internal/querycoordv2/handlers.go @@ -191,7 +191,7 @@ func (s *Server) getSystemInfoMetrics( CreatedTime: paramtable.GetCreateTime().String(), UpdatedTime: paramtable.GetUpdateTime().String(), Type: typeutil.QueryCoordRole, - ID: s.session.ServerID, + ID: paramtable.GetNodeID(), }, SystemConfigurations: metricsinfo.QueryCoordConfiguration{ SearchChannelPrefix: Params.CommonCfg.QueryCoordSearch.GetValue(), diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index 54f93b85c5..1008aaf3e9 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -25,6 +25,8 @@ import ( "syscall" "time" + "github.com/milvus-io/milvus/internal/util/paramtable" + "github.com/cockroachdb/errors" "github.com/milvus-io/milvus/internal/metrics" @@ -134,7 +136,7 @@ func (s *Server) Register() error { } } go s.session.LivenessCheck(s.ctx, func() { - log.Error("QueryCoord disconnected from etcd, process will exit", zap.Int64("serverID", s.session.ServerID)) + log.Error("QueryCoord disconnected from etcd, process will exit", zap.Int64("serverID", paramtable.GetNodeID())) if err := s.Stop(); err != nil { log.Fatal("failed to stop server", zap.Error(err)) } @@ -588,7 +590,7 @@ func (s *Server) watchNodes(revision int64) { case event, ok := <-eventChan: if !ok { // ErrCompacted is handled inside SessionWatcher - log.Error("Session Watcher channel closed", zap.Int64("serverID", s.session.ServerID)) + log.Error("Session Watcher channel closed", zap.Int64("serverID", paramtable.GetNodeID())) go s.Stop() if s.session.TriggerKill { if p, err := os.FindProcess(os.Getpid()); err == nil { diff --git a/internal/querycoordv2/services.go b/internal/querycoordv2/services.go index f463ed6605..fedcb8d2be 100644 --- a/internal/querycoordv2/services.go +++ b/internal/querycoordv2/services.go @@ -947,7 +947,7 @@ func (s *Server) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeade func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { if s.status.Load() != commonpb.StateCode_Healthy { - reason := errorutil.UnHealthReason("querycoord", s.session.ServerID, "querycoord is unhealthy") + reason := errorutil.UnHealthReason("querycoord", paramtable.GetNodeID(), "querycoord is unhealthy") return &milvuspb.CheckHealthResponse{IsHealthy: false, Reasons: []string{reason}}, nil }