mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-07 01:28:27 +08:00
fix: cost metrics collection logic for replica selection (#41965)
issue: #41621 - Deprecate EnableWorkerSQCostMetrics parameter - Always collect cost metrics from all search and retrieve results - Update code with comments explaining the changes rationale Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
parent
8a85bc4213
commit
dad43a3894
1
.gitignore
vendored
1
.gitignore
vendored
@ -26,6 +26,7 @@ tests/python_client/default.etcd/
|
|||||||
.vscode
|
.vscode
|
||||||
docker-compose-devcontainer.yml
|
docker-compose-devcontainer.yml
|
||||||
docker-compose-devcontainer.yml.tmp
|
docker-compose-devcontainer.yml.tmp
|
||||||
|
.cursor
|
||||||
|
|
||||||
*.code-workspace
|
*.code-workspace
|
||||||
|
|
||||||
|
|||||||
@ -110,15 +110,10 @@ func ReduceSearchResults(ctx context.Context, results []*internalpb.SearchResult
|
|||||||
}
|
}
|
||||||
|
|
||||||
requestCosts := lo.FilterMap(results, func(result *internalpb.SearchResults, _ int) (*internalpb.CostAggregation, bool) {
|
requestCosts := lo.FilterMap(results, func(result *internalpb.SearchResults, _ int) (*internalpb.CostAggregation, bool) {
|
||||||
if paramtable.Get().QueryNodeCfg.EnableWorkerSQCostMetrics.GetAsBool() {
|
// delegator node won't be used to load sealed segment if stream node is enabled
|
||||||
|
// and if growing segment doesn't exists, delegator won't produce any cost metrics
|
||||||
|
// so we deprecate the EnableWorkerSQCostMetrics param
|
||||||
return result.GetCostAggregation(), true
|
return result.GetCostAggregation(), true
|
||||||
}
|
|
||||||
|
|
||||||
if result.GetBase().GetSourceID() == paramtable.GetNodeID() {
|
|
||||||
return result.GetCostAggregation(), true
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil, false
|
|
||||||
})
|
})
|
||||||
searchResults.CostAggregation = mergeRequestCost(requestCosts)
|
searchResults.CostAggregation = mergeRequestCost(requestCosts)
|
||||||
if searchResults.CostAggregation == nil {
|
if searchResults.CostAggregation == nil {
|
||||||
@ -169,15 +164,10 @@ func ReduceAdvancedSearchResults(ctx context.Context, results []*internalpb.Sear
|
|||||||
}
|
}
|
||||||
searchResults.ChannelsMvcc = channelsMvcc
|
searchResults.ChannelsMvcc = channelsMvcc
|
||||||
requestCosts := lo.FilterMap(results, func(result *internalpb.SearchResults, _ int) (*internalpb.CostAggregation, bool) {
|
requestCosts := lo.FilterMap(results, func(result *internalpb.SearchResults, _ int) (*internalpb.CostAggregation, bool) {
|
||||||
if paramtable.Get().QueryNodeCfg.EnableWorkerSQCostMetrics.GetAsBool() {
|
// delegator node won't be used to load sealed segment if stream node is enabled
|
||||||
|
// and if growing segment doesn't exists, delegator won't produce any cost metrics
|
||||||
|
// so we deprecate the EnableWorkerSQCostMetrics param
|
||||||
return result.GetCostAggregation(), true
|
return result.GetCostAggregation(), true
|
||||||
}
|
|
||||||
|
|
||||||
if result.GetBase().GetSourceID() == paramtable.GetNodeID() {
|
|
||||||
return result.GetCostAggregation(), true
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil, false
|
|
||||||
})
|
})
|
||||||
searchResults.CostAggregation = mergeRequestCost(requestCosts)
|
searchResults.CostAggregation = mergeRequestCost(requestCosts)
|
||||||
if searchResults.CostAggregation == nil {
|
if searchResults.CostAggregation == nil {
|
||||||
@ -356,15 +346,10 @@ func MergeInternalRetrieveResult(ctx context.Context, retrieveResults []*interna
|
|||||||
}
|
}
|
||||||
|
|
||||||
requestCosts := lo.FilterMap(retrieveResults, func(result *internalpb.RetrieveResults, _ int) (*internalpb.CostAggregation, bool) {
|
requestCosts := lo.FilterMap(retrieveResults, func(result *internalpb.RetrieveResults, _ int) (*internalpb.CostAggregation, bool) {
|
||||||
if paramtable.Get().QueryNodeCfg.EnableWorkerSQCostMetrics.GetAsBool() {
|
// delegator node won't be used to load sealed segment if stream node is enabled
|
||||||
|
// and if growing segment doesn't exists, delegator won't produce any cost metrics
|
||||||
|
// so we deprecate the EnableWorkerSQCostMetrics param
|
||||||
return result.GetCostAggregation(), true
|
return result.GetCostAggregation(), true
|
||||||
}
|
|
||||||
|
|
||||||
if result.GetBase().GetSourceID() == paramtable.GetNodeID() {
|
|
||||||
return result.GetCostAggregation(), true
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil, false
|
|
||||||
})
|
})
|
||||||
ret.CostAggregation = mergeRequestCost(requestCosts)
|
ret.CostAggregation = mergeRequestCost(requestCosts)
|
||||||
if ret.CostAggregation == nil {
|
if ret.CostAggregation == nil {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user