From dad43a3894a687f3ea87d80712e1d042a7bf78cf Mon Sep 17 00:00:00 2001 From: wei liu Date: Thu, 22 May 2025 10:20:25 +0800 Subject: [PATCH] fix: cost metrics collection logic for replica selection (#41965) issue: #41621 - Deprecate EnableWorkerSQCostMetrics parameter - Always collect cost metrics from all search and retrieve results - Update code with comments explaining the changes rationale Signed-off-by: Wei Liu --- .gitignore | 1 + internal/querynodev2/segments/result.go | 39 ++++++++----------------- 2 files changed, 13 insertions(+), 27 deletions(-) diff --git a/.gitignore b/.gitignore index 6f47fb5ec2..2e8a46044e 100644 --- a/.gitignore +++ b/.gitignore @@ -26,6 +26,7 @@ tests/python_client/default.etcd/ .vscode docker-compose-devcontainer.yml docker-compose-devcontainer.yml.tmp +.cursor *.code-workspace diff --git a/internal/querynodev2/segments/result.go b/internal/querynodev2/segments/result.go index daa731a8cc..d60d9c1a3c 100644 --- a/internal/querynodev2/segments/result.go +++ b/internal/querynodev2/segments/result.go @@ -110,15 +110,10 @@ func ReduceSearchResults(ctx context.Context, results []*internalpb.SearchResult } requestCosts := lo.FilterMap(results, func(result *internalpb.SearchResults, _ int) (*internalpb.CostAggregation, bool) { - if paramtable.Get().QueryNodeCfg.EnableWorkerSQCostMetrics.GetAsBool() { - return result.GetCostAggregation(), true - } - - if result.GetBase().GetSourceID() == paramtable.GetNodeID() { - return result.GetCostAggregation(), true - } - - return nil, false + // delegator node won't be used to load sealed segment if stream node is enabled + // and if growing segment doesn't exists, delegator won't produce any cost metrics + // so we deprecate the EnableWorkerSQCostMetrics param + return result.GetCostAggregation(), true }) searchResults.CostAggregation = mergeRequestCost(requestCosts) if searchResults.CostAggregation == nil { @@ -169,15 +164,10 @@ func ReduceAdvancedSearchResults(ctx context.Context, results []*internalpb.Sear } searchResults.ChannelsMvcc = channelsMvcc requestCosts := lo.FilterMap(results, func(result *internalpb.SearchResults, _ int) (*internalpb.CostAggregation, bool) { - if paramtable.Get().QueryNodeCfg.EnableWorkerSQCostMetrics.GetAsBool() { - return result.GetCostAggregation(), true - } - - if result.GetBase().GetSourceID() == paramtable.GetNodeID() { - return result.GetCostAggregation(), true - } - - return nil, false + // delegator node won't be used to load sealed segment if stream node is enabled + // and if growing segment doesn't exists, delegator won't produce any cost metrics + // so we deprecate the EnableWorkerSQCostMetrics param + return result.GetCostAggregation(), true }) searchResults.CostAggregation = mergeRequestCost(requestCosts) if searchResults.CostAggregation == nil { @@ -356,15 +346,10 @@ func MergeInternalRetrieveResult(ctx context.Context, retrieveResults []*interna } requestCosts := lo.FilterMap(retrieveResults, func(result *internalpb.RetrieveResults, _ int) (*internalpb.CostAggregation, bool) { - if paramtable.Get().QueryNodeCfg.EnableWorkerSQCostMetrics.GetAsBool() { - return result.GetCostAggregation(), true - } - - if result.GetBase().GetSourceID() == paramtable.GetNodeID() { - return result.GetCostAggregation(), true - } - - return nil, false + // delegator node won't be used to load sealed segment if stream node is enabled + // and if growing segment doesn't exists, delegator won't produce any cost metrics + // so we deprecate the EnableWorkerSQCostMetrics param + return result.GetCostAggregation(), true }) ret.CostAggregation = mergeRequestCost(requestCosts) if ret.CostAggregation == nil {