Fix fetching nodeID through session before init it (#22290)

Signed-off-by: yah01 <yang.cen@zilliz.com>
This commit is contained in:
yah01 2023-02-21 18:08:27 +08:00 committed by GitHub
parent d4e0b6e91b
commit 6f374fa19d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -42,6 +42,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/commonpbutil"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/timerecord"
"github.com/milvus-io/milvus/internal/util/typeutil"
@ -280,7 +281,7 @@ func (node *QueryNode) getStatisticsWithDmlChannel(ctx context.Context, req *que
// WatchDmChannels create consumers on dmChannels to receive Incremental datawhich is the important part of real-time query
func (node *QueryNode) WatchDmChannels(ctx context.Context, in *querypb.WatchDmChannelsRequest) (*commonpb.Status, error) {
nodeID := node.GetSession().ServerID
nodeID := paramtable.GetNodeID()
// check node healthy
if !node.lifetime.Add(commonpbutil.IsHealthy) {
err := fmt.Errorf("query node %d is not ready", nodeID)
@ -371,7 +372,7 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, in *querypb.WatchDmC
func (node *QueryNode) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmChannelRequest) (*commonpb.Status, error) {
// check node healthy
nodeID := node.GetSession().ServerID
nodeID := paramtable.GetNodeID()
if !node.lifetime.Add(commonpbutil.IsHealthyOrStopping) {
err := fmt.Errorf("query node %d is not ready", nodeID)
status := &commonpb.Status{
@ -429,7 +430,7 @@ func (node *QueryNode) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmC
// LoadSegments load historical data into query node, historical data can be vector data or index
func (node *QueryNode) LoadSegments(ctx context.Context, in *querypb.LoadSegmentsRequest) (*commonpb.Status, error) {
nodeID := node.GetSession().ServerID
nodeID := paramtable.GetNodeID()
// check node healthy
if !node.lifetime.Add(commonpbutil.IsHealthy) {
err := fmt.Errorf("query node %d is not ready", nodeID)
@ -610,7 +611,7 @@ func (node *QueryNode) ReleasePartitions(ctx context.Context, in *querypb.Releas
// ReleaseSegments remove the specified segments from query node according segmentIDs, partitionIDs, and collectionID
func (node *QueryNode) ReleaseSegments(ctx context.Context, in *querypb.ReleaseSegmentsRequest) (*commonpb.Status, error) {
nodeID := node.GetSession().ServerID
nodeID := paramtable.GetNodeID()
if !node.lifetime.Add(commonpbutil.IsHealthyOrStopping) {
err := fmt.Errorf("query node %d is not ready", nodeID)
status := &commonpb.Status{
@ -707,7 +708,7 @@ func filterSegmentInfo(segmentInfos []*querypb.SegmentInfo, segmentIDs map[int64
// Search performs replica search tasks.
func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) (*internalpb.SearchResults, error) {
nodeID := node.GetSession().ServerID
nodeID := paramtable.GetNodeID()
if !node.IsStandAlone && req.GetReq().GetBase().GetTargetID() != nodeID {
return &internalpb.SearchResults{
Status: &commonpb.Status{
@ -789,7 +790,7 @@ func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) (
}
func (node *QueryNode) searchWithDmlChannel(ctx context.Context, req *querypb.SearchRequest, dmlChannel string) (*internalpb.SearchResults, error) {
nodeID := node.GetSession().ServerID
nodeID := paramtable.GetNodeID()
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(nodeID), metrics.SearchLabel, metrics.TotalLabel).Inc()
failRet := &internalpb.SearchResults{
Status: &commonpb.Status{
@ -936,7 +937,7 @@ func (node *QueryNode) searchWithDmlChannel(ctx context.Context, req *querypb.Se
}
func (node *QueryNode) queryWithDmlChannel(ctx context.Context, req *querypb.QueryRequest, dmlChannel string) (*internalpb.RetrieveResults, error) {
nodeID := node.GetSession().ServerID
nodeID := paramtable.GetNodeID()
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(nodeID), metrics.QueryLabel, metrics.TotalLabel).Inc()
failRet := &internalpb.RetrieveResults{
Status: &commonpb.Status{
@ -1091,7 +1092,7 @@ func (node *QueryNode) Query(ctx context.Context, req *querypb.QueryRequest) (*i
zap.Uint64("guaranteeTimestamp", req.Req.GetGuaranteeTimestamp()),
zap.Uint64("timeTravel", req.GetReq().GetTravelTimestamp()))
nodeID := node.GetSession().ServerID
nodeID := paramtable.GetNodeID()
if req.GetReq().GetBase().GetTargetID() != nodeID {
return &internalpb.RetrieveResults{
Status: &commonpb.Status{
@ -1194,7 +1195,7 @@ func (node *QueryNode) SyncReplicaSegments(ctx context.Context, req *querypb.Syn
// ShowConfigurations returns the configurations of queryNode matching req.Pattern
func (node *QueryNode) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) {
nodeID := node.GetSession().ServerID
nodeID := paramtable.GetNodeID()
if !node.lifetime.Add(commonpbutil.IsHealthyOrStopping) {
log.Warn("QueryNode.ShowConfigurations failed",
zap.Int64("nodeId", nodeID),
@ -1231,7 +1232,7 @@ func (node *QueryNode) ShowConfigurations(ctx context.Context, req *internalpb.S
// GetMetrics return system infos of the query node, such as total memory, memory usage, cpu usage ...
func (node *QueryNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
nodeID := node.GetSession().ServerID
nodeID := paramtable.GetNodeID()
if !node.lifetime.Add(commonpbutil.IsHealthyOrStopping) {
log.Ctx(ctx).Warn("QueryNode.GetMetrics failed",
zap.Int64("nodeId", nodeID),
@ -1296,10 +1297,10 @@ func (node *QueryNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsR
}
func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.GetDataDistributionRequest) (*querypb.GetDataDistributionResponse, error) {
nodeID := node.GetSession().ServerID
nodeID := paramtable.GetNodeID()
log := log.With(
zap.Int64("msg-id", req.GetBase().GetMsgID()),
zap.Int64("node-id", nodeID),
zap.Int64("msgID", req.GetBase().GetMsgID()),
zap.Int64("nodeID", nodeID),
)
if !node.lifetime.Add(commonpbutil.IsHealthyOrStopping) {
log.Warn("QueryNode.GetMetrics failed",
@ -1391,7 +1392,7 @@ func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.Get
func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDistributionRequest) (*commonpb.Status, error) {
log := log.Ctx(ctx).With(zap.Int64("collectionID", req.GetCollectionID()), zap.String("channel", req.GetChannel()))
nodeID := node.GetSession().ServerID
nodeID := paramtable.GetNodeID()
// check node healthy
if !node.lifetime.Add(commonpbutil.IsHealthyOrStopping) {
err := fmt.Errorf("query node %d is not ready", nodeID)