enable config shard level cost metrics whether contains worker's cost (#26132)

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
wei liu 2023-08-09 11:37:15 +08:00 committed by GitHub
parent 2770ac4df5
commit f33a89387f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 40 additions and 5 deletions

View File

@ -32,6 +32,7 @@ import (
typeutil2 "github.com/milvus-io/milvus/internal/util/typeutil" typeutil2 "github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil" "github.com/milvus-io/milvus/pkg/util/typeutil"
) )
@ -75,8 +76,16 @@ func ReduceSearchResults(ctx context.Context, results []*internalpb.SearchResult
return nil, err return nil, err
} }
requestCosts := lo.Map(results, func(result *internalpb.SearchResults, _ int) *internalpb.CostAggregation { requestCosts := lo.FilterMap(results, func(result *internalpb.SearchResults, _ int) (*internalpb.CostAggregation, bool) {
return result.GetCostAggregation() if paramtable.Get().QueryNodeCfg.EnableWorkerSQCostMetrics.GetAsBool() {
return result.GetCostAggregation(), true
}
if result.GetBase().GetSourceID() == paramtable.GetNodeID() {
return result.GetCostAggregation(), true
}
return nil, false
}) })
searchResults.CostAggregation = mergeRequestCost(requestCosts) searchResults.CostAggregation = mergeRequestCost(requestCosts)
@ -289,10 +298,15 @@ func MergeInternalRetrieveResult(ctx context.Context, retrieveResults []*interna
} }
requestCosts := lo.FilterMap(retrieveResults, func(result *internalpb.RetrieveResults, _ int) (*internalpb.CostAggregation, bool) { requestCosts := lo.FilterMap(retrieveResults, func(result *internalpb.RetrieveResults, _ int) (*internalpb.CostAggregation, bool) {
if result.CostAggregation == nil { if paramtable.Get().QueryNodeCfg.EnableWorkerSQCostMetrics.GetAsBool() {
return nil, false return result.GetCostAggregation(), true
} }
return result.CostAggregation, true
if result.GetBase().GetSourceID() == paramtable.GetNodeID() {
return result.GetCostAggregation(), true
}
return nil, false
}) })
ret.CostAggregation = mergeRequestCost(requestCosts) ret.CostAggregation = mergeRequestCost(requestCosts)

View File

@ -123,6 +123,9 @@ func (t *QueryTask) Execute() error {
} }
t.result = &internalpb.RetrieveResults{ t.result = &internalpb.RetrieveResults{
Base: &commonpb.MsgBase{
SourceID: paramtable.GetNodeID(),
},
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
Ids: reducedResult.Ids, Ids: reducedResult.Ids,
FieldsData: reducedResult.FieldsData, FieldsData: reducedResult.FieldsData,

View File

@ -159,6 +159,9 @@ func (t *SearchTask) Execute() error {
} }
task.result = &internalpb.SearchResults{ task.result = &internalpb.SearchResults{
Base: &commonpb.MsgBase{
SourceID: paramtable.GetNodeID(),
},
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
MetricType: req.GetReq().GetMetricType(), MetricType: req.GetReq().GetMetricType(),
NumQueries: t.originNqs[i], NumQueries: t.originNqs[i],
@ -212,6 +215,9 @@ func (t *SearchTask) Execute() error {
Observe(float64(reduceLatency.Milliseconds())) Observe(float64(reduceLatency.Milliseconds()))
task.result = &internalpb.SearchResults{ task.result = &internalpb.SearchResults{
Base: &commonpb.MsgBase{
SourceID: paramtable.GetNodeID(),
},
Status: util.WrapStatus(commonpb.ErrorCode_Success, ""), Status: util.WrapStatus(commonpb.ErrorCode_Success, ""),
MetricType: req.GetReq().GetMetricType(), MetricType: req.GetReq().GetMetricType(),
NumQueries: t.originNqs[i], NumQueries: t.originNqs[i],

View File

@ -1563,6 +1563,8 @@ type queryNodeConfig struct {
// CGOPoolSize ratio to MaxReadConcurrency // CGOPoolSize ratio to MaxReadConcurrency
CGOPoolSizeRatio ParamItem `refreshable:"false"` CGOPoolSizeRatio ParamItem `refreshable:"false"`
EnableWorkerSQCostMetrics ParamItem `refreshable:"true"`
} }
func (p *queryNodeConfig) init(base *BaseTable) { func (p *queryNodeConfig) init(base *BaseTable) {
@ -1925,6 +1927,14 @@ Max read concurrency must greater than or equal to 1, and less than or equal to
Doc: "cgo pool size ratio to max read concurrency", Doc: "cgo pool size ratio to max read concurrency",
} }
p.CGOPoolSizeRatio.Init(base.mgr) p.CGOPoolSizeRatio.Init(base.mgr)
p.EnableWorkerSQCostMetrics = ParamItem{
Key: "queryNode.enableWorkerSQCostMetrics",
Version: "2.3.0",
DefaultValue: "false",
Doc: "whether use worker's cost to measure delegator's workload",
}
p.EnableWorkerSQCostMetrics.Init(base.mgr)
} }
// ///////////////////////////////////////////////////////////////////////////// // /////////////////////////////////////////////////////////////////////////////

View File

@ -339,6 +339,8 @@ func TestComponentParam(t *testing.T) {
params.Save("queryNode.gracefulStopTimeout", "100") params.Save("queryNode.gracefulStopTimeout", "100")
gracefulStopTimeout := Params.GracefulStopTimeout gracefulStopTimeout := Params.GracefulStopTimeout
assert.Equal(t, int64(100), gracefulStopTimeout.GetAsInt64()) assert.Equal(t, int64(100), gracefulStopTimeout.GetAsInt64())
assert.Equal(t, false, Params.EnableWorkerSQCostMetrics.GetAsBool())
}) })
t.Run("test dataCoordConfig", func(t *testing.T) { t.Run("test dataCoordConfig", func(t *testing.T) {