From 6f374fa19d4bbd7013f5ce64281f20d5786ef44f Mon Sep 17 00:00:00 2001 From: yah01 Date: Tue, 21 Feb 2023 18:08:27 +0800 Subject: [PATCH] Fix fetching nodeID through session before init it (#22290) Signed-off-by: yah01 --- internal/querynode/impl.go | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/internal/querynode/impl.go b/internal/querynode/impl.go index d848011e4d..3dfd5ca71b 100644 --- a/internal/querynode/impl.go +++ b/internal/querynode/impl.go @@ -42,6 +42,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/util/commonpbutil" "github.com/milvus-io/milvus/internal/util/metricsinfo" + "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/timerecord" "github.com/milvus-io/milvus/internal/util/typeutil" @@ -280,7 +281,7 @@ func (node *QueryNode) getStatisticsWithDmlChannel(ctx context.Context, req *que // WatchDmChannels create consumers on dmChannels to receive Incremental data,which is the important part of real-time query func (node *QueryNode) WatchDmChannels(ctx context.Context, in *querypb.WatchDmChannelsRequest) (*commonpb.Status, error) { - nodeID := node.GetSession().ServerID + nodeID := paramtable.GetNodeID() // check node healthy if !node.lifetime.Add(commonpbutil.IsHealthy) { err := fmt.Errorf("query node %d is not ready", nodeID) @@ -371,7 +372,7 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, in *querypb.WatchDmC func (node *QueryNode) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmChannelRequest) (*commonpb.Status, error) { // check node healthy - nodeID := node.GetSession().ServerID + nodeID := paramtable.GetNodeID() if !node.lifetime.Add(commonpbutil.IsHealthyOrStopping) { err := fmt.Errorf("query node %d is not ready", nodeID) status := &commonpb.Status{ @@ -429,7 +430,7 @@ func (node *QueryNode) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmC // LoadSegments load historical data into query node, historical data can be vector data or index func (node *QueryNode) LoadSegments(ctx context.Context, in *querypb.LoadSegmentsRequest) (*commonpb.Status, error) { - nodeID := node.GetSession().ServerID + nodeID := paramtable.GetNodeID() // check node healthy if !node.lifetime.Add(commonpbutil.IsHealthy) { err := fmt.Errorf("query node %d is not ready", nodeID) @@ -610,7 +611,7 @@ func (node *QueryNode) ReleasePartitions(ctx context.Context, in *querypb.Releas // ReleaseSegments remove the specified segments from query node according segmentIDs, partitionIDs, and collectionID func (node *QueryNode) ReleaseSegments(ctx context.Context, in *querypb.ReleaseSegmentsRequest) (*commonpb.Status, error) { - nodeID := node.GetSession().ServerID + nodeID := paramtable.GetNodeID() if !node.lifetime.Add(commonpbutil.IsHealthyOrStopping) { err := fmt.Errorf("query node %d is not ready", nodeID) status := &commonpb.Status{ @@ -707,7 +708,7 @@ func filterSegmentInfo(segmentInfos []*querypb.SegmentInfo, segmentIDs map[int64 // Search performs replica search tasks. func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) (*internalpb.SearchResults, error) { - nodeID := node.GetSession().ServerID + nodeID := paramtable.GetNodeID() if !node.IsStandAlone && req.GetReq().GetBase().GetTargetID() != nodeID { return &internalpb.SearchResults{ Status: &commonpb.Status{ @@ -789,7 +790,7 @@ func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) ( } func (node *QueryNode) searchWithDmlChannel(ctx context.Context, req *querypb.SearchRequest, dmlChannel string) (*internalpb.SearchResults, error) { - nodeID := node.GetSession().ServerID + nodeID := paramtable.GetNodeID() metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(nodeID), metrics.SearchLabel, metrics.TotalLabel).Inc() failRet := &internalpb.SearchResults{ Status: &commonpb.Status{ @@ -936,7 +937,7 @@ func (node *QueryNode) searchWithDmlChannel(ctx context.Context, req *querypb.Se } func (node *QueryNode) queryWithDmlChannel(ctx context.Context, req *querypb.QueryRequest, dmlChannel string) (*internalpb.RetrieveResults, error) { - nodeID := node.GetSession().ServerID + nodeID := paramtable.GetNodeID() metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(nodeID), metrics.QueryLabel, metrics.TotalLabel).Inc() failRet := &internalpb.RetrieveResults{ Status: &commonpb.Status{ @@ -1091,7 +1092,7 @@ func (node *QueryNode) Query(ctx context.Context, req *querypb.QueryRequest) (*i zap.Uint64("guaranteeTimestamp", req.Req.GetGuaranteeTimestamp()), zap.Uint64("timeTravel", req.GetReq().GetTravelTimestamp())) - nodeID := node.GetSession().ServerID + nodeID := paramtable.GetNodeID() if req.GetReq().GetBase().GetTargetID() != nodeID { return &internalpb.RetrieveResults{ Status: &commonpb.Status{ @@ -1194,7 +1195,7 @@ func (node *QueryNode) SyncReplicaSegments(ctx context.Context, req *querypb.Syn // ShowConfigurations returns the configurations of queryNode matching req.Pattern func (node *QueryNode) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) { - nodeID := node.GetSession().ServerID + nodeID := paramtable.GetNodeID() if !node.lifetime.Add(commonpbutil.IsHealthyOrStopping) { log.Warn("QueryNode.ShowConfigurations failed", zap.Int64("nodeId", nodeID), @@ -1231,7 +1232,7 @@ func (node *QueryNode) ShowConfigurations(ctx context.Context, req *internalpb.S // GetMetrics return system infos of the query node, such as total memory, memory usage, cpu usage ... func (node *QueryNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { - nodeID := node.GetSession().ServerID + nodeID := paramtable.GetNodeID() if !node.lifetime.Add(commonpbutil.IsHealthyOrStopping) { log.Ctx(ctx).Warn("QueryNode.GetMetrics failed", zap.Int64("nodeId", nodeID), @@ -1296,10 +1297,10 @@ func (node *QueryNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsR } func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.GetDataDistributionRequest) (*querypb.GetDataDistributionResponse, error) { - nodeID := node.GetSession().ServerID + nodeID := paramtable.GetNodeID() log := log.With( - zap.Int64("msg-id", req.GetBase().GetMsgID()), - zap.Int64("node-id", nodeID), + zap.Int64("msgID", req.GetBase().GetMsgID()), + zap.Int64("nodeID", nodeID), ) if !node.lifetime.Add(commonpbutil.IsHealthyOrStopping) { log.Warn("QueryNode.GetMetrics failed", @@ -1391,7 +1392,7 @@ func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.Get func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDistributionRequest) (*commonpb.Status, error) { log := log.Ctx(ctx).With(zap.Int64("collectionID", req.GetCollectionID()), zap.String("channel", req.GetChannel())) - nodeID := node.GetSession().ServerID + nodeID := paramtable.GetNodeID() // check node healthy if !node.lifetime.Add(commonpbutil.IsHealthyOrStopping) { err := fmt.Errorf("query node %d is not ready", nodeID)