From f33a89387f3e992c0935be76c33780a0bdebca1e Mon Sep 17 00:00:00 2001 From: wei liu Date: Wed, 9 Aug 2023 11:37:15 +0800 Subject: [PATCH] enable config shard level cost metrics whether contains worker's cost (#26132) Signed-off-by: Wei Liu --- internal/querynodev2/segments/result.go | 24 ++++++++++++++++----- internal/querynodev2/tasks/query_task.go | 3 +++ internal/querynodev2/tasks/task.go | 6 ++++++ pkg/util/paramtable/component_param.go | 10 +++++++++ pkg/util/paramtable/component_param_test.go | 2 ++ 5 files changed, 40 insertions(+), 5 deletions(-) diff --git a/internal/querynodev2/segments/result.go b/internal/querynodev2/segments/result.go index 4b09e3d0c6..ad69a5d78c 100644 --- a/internal/querynodev2/segments/result.go +++ b/internal/querynodev2/segments/result.go @@ -32,6 +32,7 @@ import ( typeutil2 "github.com/milvus-io/milvus/internal/util/typeutil" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -75,8 +76,16 @@ func ReduceSearchResults(ctx context.Context, results []*internalpb.SearchResult return nil, err } - requestCosts := lo.Map(results, func(result *internalpb.SearchResults, _ int) *internalpb.CostAggregation { - return result.GetCostAggregation() + requestCosts := lo.FilterMap(results, func(result *internalpb.SearchResults, _ int) (*internalpb.CostAggregation, bool) { + if paramtable.Get().QueryNodeCfg.EnableWorkerSQCostMetrics.GetAsBool() { + return result.GetCostAggregation(), true + } + + if result.GetBase().GetSourceID() == paramtable.GetNodeID() { + return result.GetCostAggregation(), true + } + + return nil, false }) searchResults.CostAggregation = mergeRequestCost(requestCosts) @@ -289,10 +298,15 @@ func MergeInternalRetrieveResult(ctx context.Context, retrieveResults []*interna } requestCosts := lo.FilterMap(retrieveResults, func(result *internalpb.RetrieveResults, _ int) (*internalpb.CostAggregation, bool) { - if result.CostAggregation == nil { - return nil, false + if paramtable.Get().QueryNodeCfg.EnableWorkerSQCostMetrics.GetAsBool() { + return result.GetCostAggregation(), true } - return result.CostAggregation, true + + if result.GetBase().GetSourceID() == paramtable.GetNodeID() { + return result.GetCostAggregation(), true + } + + return nil, false }) ret.CostAggregation = mergeRequestCost(requestCosts) diff --git a/internal/querynodev2/tasks/query_task.go b/internal/querynodev2/tasks/query_task.go index e1f9cc337b..e56319e511 100644 --- a/internal/querynodev2/tasks/query_task.go +++ b/internal/querynodev2/tasks/query_task.go @@ -123,6 +123,9 @@ func (t *QueryTask) Execute() error { } t.result = &internalpb.RetrieveResults{ + Base: &commonpb.MsgBase{ + SourceID: paramtable.GetNodeID(), + }, Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, Ids: reducedResult.Ids, FieldsData: reducedResult.FieldsData, diff --git a/internal/querynodev2/tasks/task.go b/internal/querynodev2/tasks/task.go index 7d2be71eb7..68b4b4cfa5 100644 --- a/internal/querynodev2/tasks/task.go +++ b/internal/querynodev2/tasks/task.go @@ -159,6 +159,9 @@ func (t *SearchTask) Execute() error { } task.result = &internalpb.SearchResults{ + Base: &commonpb.MsgBase{ + SourceID: paramtable.GetNodeID(), + }, Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, MetricType: req.GetReq().GetMetricType(), NumQueries: t.originNqs[i], @@ -212,6 +215,9 @@ func (t *SearchTask) Execute() error { Observe(float64(reduceLatency.Milliseconds())) task.result = &internalpb.SearchResults{ + Base: &commonpb.MsgBase{ + SourceID: paramtable.GetNodeID(), + }, Status: util.WrapStatus(commonpb.ErrorCode_Success, ""), MetricType: req.GetReq().GetMetricType(), NumQueries: t.originNqs[i], diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 1766a53fc8..de12f2f6ca 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -1563,6 +1563,8 @@ type queryNodeConfig struct { // CGOPoolSize ratio to MaxReadConcurrency CGOPoolSizeRatio ParamItem `refreshable:"false"` + + EnableWorkerSQCostMetrics ParamItem `refreshable:"true"` } func (p *queryNodeConfig) init(base *BaseTable) { @@ -1925,6 +1927,14 @@ Max read concurrency must greater than or equal to 1, and less than or equal to Doc: "cgo pool size ratio to max read concurrency", } p.CGOPoolSizeRatio.Init(base.mgr) + + p.EnableWorkerSQCostMetrics = ParamItem{ + Key: "queryNode.enableWorkerSQCostMetrics", + Version: "2.3.0", + DefaultValue: "false", + Doc: "whether use worker's cost to measure delegator's workload", + } + p.EnableWorkerSQCostMetrics.Init(base.mgr) } // ///////////////////////////////////////////////////////////////////////////// diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 073981b5b6..248cb67786 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -339,6 +339,8 @@ func TestComponentParam(t *testing.T) { params.Save("queryNode.gracefulStopTimeout", "100") gracefulStopTimeout := Params.GracefulStopTimeout assert.Equal(t, int64(100), gracefulStopTimeout.GetAsInt64()) + + assert.Equal(t, false, Params.EnableWorkerSQCostMetrics.GetAsBool()) }) t.Run("test dataCoordConfig", func(t *testing.T) {