From 90bed1caf984dfd383192a4e9a7ffd1d4aadd7a4 Mon Sep 17 00:00:00 2001 From: SimFG Date: Wed, 10 Apr 2024 15:07:17 +0800 Subject: [PATCH] enhance: add the related data size for the read apis (#31816) issue: #30436 origin pr: #30438 related pr: #31772 --------- Signed-off-by: SimFG --- internal/datacoord/metrics_info.go | 2 +- internal/proto/internal.proto | 1 + internal/proxy/impl.go | 173 +++++++++++------- internal/proxy/task_query.go | 13 +- internal/proxy/task_search.go | 3 + .../querynodev2/segments/count_reducer.go | 5 + internal/querynodev2/segments/result.go | 46 +++-- internal/querynodev2/segments/retrieve.go | 9 +- internal/querynodev2/services.go | 9 +- internal/querynodev2/tasks/query_task.go | 8 +- internal/querynodev2/tasks/search_task.go | 8 +- internal/util/hookutil/constant.go | 19 +- pkg/metrics/proxy_metrics.go | 10 + pkg/util/paramtable/runtime.go | 5 + pkg/util/typeutil/get_dim.go | 11 -- .../hellomilvus/hello_milvus_test.go | 8 +- .../partitionkey/partition_key_test.go | 12 +- 17 files changed, 220 insertions(+), 122 deletions(-) diff --git a/internal/datacoord/metrics_info.go b/internal/datacoord/metrics_info.go index 78b4890af8..61bc46bae2 100644 --- a/internal/datacoord/metrics_info.go +++ b/internal/datacoord/metrics_info.go @@ -73,7 +73,7 @@ func (s *Server) getCollectionMetrics(ctx context.Context) *metricsinfo.DataCoor ret.Collections[collectionID].IndexInfo = append(ret.Collections[collectionID].IndexInfo, &metricsinfo.DataCoordIndexInfo{ NumEntitiesIndexed: info.GetIndexedRows(), IndexName: info.GetIndexName(), - FieldID: info.GetIndexID(), + FieldID: info.GetFieldID(), }) } } diff --git a/internal/proto/internal.proto b/internal/proto/internal.proto index f028b2e7f2..93544a5288 100644 --- a/internal/proto/internal.proto +++ b/internal/proto/internal.proto @@ -161,6 +161,7 @@ message CostAggregation { int64 responseTime = 1; int64 serviceTime = 2; int64 totalNQ = 3; + int64 totalRelatedDataSize = 4; } message RetrieveRequest { diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index abb3962608..aaaee8ffc6 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -2436,23 +2436,31 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() successCnt := it.result.InsertCnt - int64(len(it.result.ErrIndex)) + username := GetCurUserFromContextOrDefault(ctx) + nodeID := paramtable.GetStringNodeID() + dbName := request.DbName + collectionName := request.CollectionName + v := Extension.Report(map[string]any{ - hookutil.OpTypeKey: hookutil.OpTypeInsert, - hookutil.DatabaseKey: request.DbName, - hookutil.UsernameKey: GetCurUserFromContextOrDefault(ctx), - hookutil.DataSizeKey: proto.Size(request), - hookutil.SuccessCntKey: successCnt, - hookutil.FailCntKey: len(it.result.ErrIndex), + hookutil.OpTypeKey: hookutil.OpTypeInsert, + hookutil.DatabaseKey: dbName, + hookutil.UsernameKey: username, + hookutil.RequestDataSizeKey: proto.Size(request), + hookutil.SuccessCntKey: successCnt, + hookutil.FailCntKey: len(it.result.ErrIndex), }) SetReportValue(it.result.GetStatus(), v) + if merr.Ok(it.result.GetStatus()) { + metrics.ProxyReportValue.WithLabelValues(nodeID, hookutil.OpTypeInsert, request.DbName, username).Add(float64(v)) + } metrics.ProxyInsertVectors. - WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), request.GetDbName(), request.GetCollectionName()). + WithLabelValues(nodeID, dbName, collectionName). Add(float64(successCnt)) metrics.ProxyMutationLatency. - WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.InsertLabel, request.GetDbName(), request.GetCollectionName()). + WithLabelValues(nodeID, metrics.InsertLabel, dbName, collectionName). Observe(float64(tr.ElapseSpan().Milliseconds())) metrics.ProxyCollectionMutationLatency. - WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.InsertLabel, request.CollectionName). + WithLabelValues(nodeID, metrics.InsertLabel, collectionName). Observe(float64(tr.ElapseSpan().Milliseconds())) return it.result, nil } @@ -2511,7 +2519,7 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) log.Debug("Run delete in Proxy") if err := dr.Run(ctx); err != nil { - log.Error("Failed to enqueue delete task: " + err.Error()) + log.Error("Failed to run delete task: " + err.Error()) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc() @@ -2526,21 +2534,28 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) successCnt := dr.result.GetDeleteCnt() metrics.ProxyDeleteVectors.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(successCnt)) + username := GetCurUserFromContextOrDefault(ctx) + nodeID := paramtable.GetStringNodeID() + dbName := request.DbName + collectionName := request.CollectionName v := Extension.Report(map[string]any{ hookutil.OpTypeKey: hookutil.OpTypeDelete, - hookutil.DatabaseKey: request.DbName, - hookutil.UsernameKey: GetCurUserFromContextOrDefault(ctx), + hookutil.DatabaseKey: dbName, + hookutil.UsernameKey: username, hookutil.SuccessCntKey: successCnt, hookutil.RelatedCntKey: dr.allQueryCnt.Load(), }) SetReportValue(dr.result.GetStatus(), v) - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() + if merr.Ok(dr.result.GetStatus()) { + metrics.ProxyReportValue.WithLabelValues(nodeID, hookutil.OpTypeDelete, dbName, username).Add(float64(v)) + } + metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, + metrics.SuccessLabel, dbName, collectionName).Inc() metrics.ProxyMutationLatency. - WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.DeleteLabel, request.GetDbName(), request.GetCollectionName()). + WithLabelValues(nodeID, metrics.DeleteLabel, dbName, collectionName). Observe(float64(tr.ElapseSpan().Milliseconds())) - metrics.ProxyCollectionMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.DeleteLabel, request.CollectionName).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.ProxyCollectionMutationLatency.WithLabelValues(nodeID, metrics.DeleteLabel, collectionName).Observe(float64(tr.ElapseSpan().Milliseconds())) return dr.result, nil } @@ -2652,28 +2667,34 @@ func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest) // UpsertCnt always equals to the number of entities in the request it.result.UpsertCnt = int64(request.NumRows) + username := GetCurUserFromContextOrDefault(ctx) + nodeID := paramtable.GetStringNodeID() + dbName := request.DbName + collectionName := request.CollectionName v := Extension.Report(map[string]any{ - hookutil.OpTypeKey: hookutil.OpTypeUpsert, - hookutil.DatabaseKey: request.DbName, - hookutil.UsernameKey: GetCurUserFromContextOrDefault(ctx), - hookutil.DataSizeKey: proto.Size(it.req), - hookutil.SuccessCntKey: it.result.UpsertCnt, - hookutil.FailCntKey: len(it.result.ErrIndex), + hookutil.OpTypeKey: hookutil.OpTypeUpsert, + hookutil.DatabaseKey: request.DbName, + hookutil.UsernameKey: username, + hookutil.RequestDataSizeKey: proto.Size(it.req), + hookutil.SuccessCntKey: it.result.UpsertCnt, + hookutil.FailCntKey: len(it.result.ErrIndex), }) SetReportValue(it.result.GetStatus(), v) - rateCol.Add(internalpb.RateType_DMLUpsert.String(), float64(it.upsertMsg.DeleteMsg.Size()+it.upsertMsg.DeleteMsg.Size())) - - metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, - metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() + rateCol.Add(internalpb.RateType_DMLUpsert.String(), float64(it.upsertMsg.InsertMsg.Size()+it.upsertMsg.DeleteMsg.Size())) + if merr.Ok(it.result.GetStatus()) { + metrics.ProxyReportValue.WithLabelValues(nodeID, hookutil.OpTypeUpsert, dbName, username).Add(float64(v)) + } + metrics.ProxyFunctionCall.WithLabelValues(nodeID, method, + metrics.SuccessLabel, dbName, collectionName).Inc() successCnt := it.result.UpsertCnt - int64(len(it.result.ErrIndex)) metrics.ProxyUpsertVectors. - WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), request.GetDbName(), request.GetCollectionName()). + WithLabelValues(nodeID, dbName, collectionName). Add(float64(successCnt)) metrics.ProxyMutationLatency. - WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.UpsertLabel, request.GetDbName(), request.GetCollectionName()). + WithLabelValues(nodeID, metrics.UpsertLabel, dbName, collectionName). Observe(float64(tr.ElapseSpan().Milliseconds())) - metrics.ProxyCollectionMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.UpsertLabel, request.CollectionName).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.ProxyCollectionMutationLatency.WithLabelValues(nodeID, metrics.UpsertLabel, collectionName).Observe(float64(tr.ElapseSpan().Milliseconds())) log.Debug("Finish processing upsert request in Proxy") return it.result, nil @@ -2686,7 +2707,8 @@ func (node *Proxy) Search(ctx context.Context, request *milvuspb.SearchRequest) Status: merr.Success(), } err2 := retry.Handle(ctx, func() (bool, error) { - rsp, err = node.search(ctx, request) + rsp, err = node. + search(ctx, request) if errors.Is(merr.Error(rsp.GetStatus()), merr.ErrInconsistentRequery) { return true, merr.Error(rsp.GetStatus()) } @@ -2835,8 +2857,11 @@ func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest) } span := tr.CtxRecord(ctx, "wait search result") + nodeID := paramtable.GetStringNodeID() + dbName := request.DbName + collectionName := request.CollectionName metrics.ProxyWaitForSearchResultLatency.WithLabelValues( - strconv.FormatInt(paramtable.GetNodeID(), 10), + nodeID, metrics.SearchLabel, ).Observe(float64(span.Milliseconds())) @@ -2844,42 +2869,47 @@ func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest) log.Debug(rpcDone(method)) metrics.ProxyFunctionCall.WithLabelValues( - strconv.FormatInt(paramtable.GetNodeID(), 10), + nodeID, method, metrics.SuccessLabel, - request.GetDbName(), - request.GetCollectionName(), + dbName, + collectionName, ).Inc() metrics.ProxySearchVectors. - WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), request.GetDbName(), request.GetCollectionName()). + WithLabelValues(nodeID, dbName, collectionName). Add(float64(qt.result.GetResults().GetNumQueries())) searchDur := tr.ElapseSpan().Milliseconds() metrics.ProxySQLatency.WithLabelValues( - strconv.FormatInt(paramtable.GetNodeID(), 10), + nodeID, metrics.SearchLabel, - request.GetDbName(), - request.GetCollectionName(), + dbName, + collectionName, ).Observe(float64(searchDur)) metrics.ProxyCollectionSQLatency.WithLabelValues( - strconv.FormatInt(paramtable.GetNodeID(), 10), + nodeID, metrics.SearchLabel, - request.CollectionName, + collectionName, ).Observe(float64(searchDur)) if qt.result != nil { + username := GetCurUserFromContextOrDefault(ctx) sentSize := proto.Size(qt.result) v := Extension.Report(map[string]any{ - hookutil.OpTypeKey: hookutil.OpTypeSearch, - hookutil.DatabaseKey: request.DbName, - hookutil.UsernameKey: GetCurUserFromContextOrDefault(ctx), - hookutil.DataSizeKey: sentSize, - hookutil.RelatedCntKey: qt.result.GetResults().GetAllSearchCount(), + hookutil.OpTypeKey: hookutil.OpTypeSearch, + hookutil.DatabaseKey: dbName, + hookutil.UsernameKey: username, + hookutil.ResultDataSizeKey: sentSize, + hookutil.RelatedDataSizeKey: qt.relatedDataSize, + hookutil.RelatedCntKey: qt.result.GetResults().GetAllSearchCount(), }) SetReportValue(qt.result.GetStatus(), v) - metrics.ProxyReadReqSendBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(sentSize)) + if merr.Ok(qt.result.GetStatus()) { + metrics.ProxyReportValue.WithLabelValues(nodeID, hookutil.OpTypeSearch, dbName, username).Add(float64(v)) + } + metrics.ProxyReadReqSendBytes.WithLabelValues(nodeID).Add(float64(sentSize)) rateCol.Add(metricsinfo.ReadResultThroughput, float64(sentSize)) } return qt.result, nil @@ -3016,8 +3046,11 @@ func (node *Proxy) hybridSearch(ctx context.Context, request *milvuspb.HybridSea } span := tr.CtxRecord(ctx, "wait hybrid search result") + nodeID := paramtable.GetStringNodeID() + dbName := request.DbName + collectionName := request.CollectionName metrics.ProxyWaitForSearchResultLatency.WithLabelValues( - strconv.FormatInt(paramtable.GetNodeID(), 10), + nodeID, metrics.HybridSearchLabel, ).Observe(float64(span.Milliseconds())) @@ -3025,7 +3058,7 @@ func (node *Proxy) hybridSearch(ctx context.Context, request *milvuspb.HybridSea log.Debug(rpcDone(method)) metrics.ProxyFunctionCall.WithLabelValues( - strconv.FormatInt(paramtable.GetNodeID(), 10), + nodeID, method, metrics.SuccessLabel, request.GetDbName(), @@ -3033,34 +3066,39 @@ func (node *Proxy) hybridSearch(ctx context.Context, request *milvuspb.HybridSea ).Inc() metrics.ProxySearchVectors. - WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), request.GetDbName(), request.GetCollectionName()). + WithLabelValues(nodeID, dbName, collectionName). Add(float64(len(request.GetRequests()))) searchDur := tr.ElapseSpan().Milliseconds() metrics.ProxySQLatency.WithLabelValues( - strconv.FormatInt(paramtable.GetNodeID(), 10), + nodeID, metrics.HybridSearchLabel, - request.GetDbName(), - request.GetCollectionName(), + dbName, + collectionName, ).Observe(float64(searchDur)) metrics.ProxyCollectionSQLatency.WithLabelValues( - strconv.FormatInt(paramtable.GetNodeID(), 10), + nodeID, metrics.HybridSearchLabel, - request.CollectionName, + collectionName, ).Observe(float64(searchDur)) if qt.result != nil { sentSize := proto.Size(qt.result) + username := GetCurUserFromContextOrDefault(ctx) v := Extension.Report(map[string]any{ - hookutil.OpTypeKey: hookutil.OpTypeHybridSearch, - hookutil.DatabaseKey: request.DbName, - hookutil.UsernameKey: GetCurUserFromContextOrDefault(ctx), - hookutil.DataSizeKey: sentSize, - hookutil.RelatedCntKey: qt.result.GetResults().GetAllSearchCount(), + hookutil.OpTypeKey: hookutil.OpTypeHybridSearch, + hookutil.DatabaseKey: dbName, + hookutil.UsernameKey: username, + hookutil.ResultDataSizeKey: sentSize, + hookutil.RelatedDataSizeKey: qt.relatedDataSize, + hookutil.RelatedCntKey: qt.result.GetResults().GetAllSearchCount(), }) SetReportValue(qt.result.GetStatus(), v) - metrics.ProxyReadReqSendBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(sentSize)) + if merr.Ok(qt.result.GetStatus()) { + metrics.ProxyReportValue.WithLabelValues(nodeID, hookutil.OpTypeHybridSearch, dbName, username).Add(float64(v)) + } + metrics.ProxyReadReqSendBytes.WithLabelValues(nodeID).Add(float64(sentSize)) rateCol.Add(metricsinfo.ReadResultThroughput, float64(sentSize)) } return qt.result, nil @@ -3350,15 +3388,18 @@ func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (* } res, err := node.query(ctx, qt) if merr.Ok(res.Status) && err == nil { + username := GetCurUserFromContextOrDefault(ctx) + nodeID := paramtable.GetStringNodeID() v := Extension.Report(map[string]any{ - hookutil.OpTypeKey: hookutil.OpTypeQuery, - hookutil.DatabaseKey: request.DbName, - hookutil.UsernameKey: GetCurUserFromContextOrDefault(ctx), - hookutil.DataSizeKey: proto.Size(res), - hookutil.RelatedCntKey: qt.allQueryCnt, - hookutil.DimensionKey: qt.dimension, + hookutil.OpTypeKey: hookutil.OpTypeQuery, + hookutil.DatabaseKey: request.DbName, + hookutil.UsernameKey: username, + hookutil.ResultDataSizeKey: proto.Size(res), + hookutil.RelatedDataSizeKey: qt.totalRelatedDataSize, + hookutil.RelatedCntKey: qt.allQueryCnt, }) SetReportValue(res.Status, v) + metrics.ProxyReportValue.WithLabelValues(nodeID, hookutil.OpTypeQuery, request.DbName, username).Add(float64(v)) } return res, err } diff --git a/internal/proxy/task_query.go b/internal/proxy/task_query.go index dcd71baf31..8b5d45224a 100644 --- a/internal/proxy/task_query.go +++ b/internal/proxy/task_query.go @@ -54,7 +54,6 @@ type queryTask struct { collectionName string queryParams *queryParams schema *schemaInfo - dimension int64 userOutputFields []string @@ -66,8 +65,9 @@ type queryTask struct { channelsMvcc map[string]Timestamp fastSkip bool - reQuery bool - allQueryCnt int64 + reQuery bool + allQueryCnt int64 + totalRelatedDataSize int64 } type queryParams struct { @@ -341,11 +341,6 @@ func (t *queryTask) PreExecute(ctx context.Context) error { return err } t.schema = schema - t.dimension, err = typeutil.GetCollectionDim(t.schema.CollectionSchema) - if err != nil { - log.Warn("get collection dimension failed", zap.Error(err)) - return err - } if t.ids != nil { pkField := "" @@ -481,6 +476,7 @@ func (t *queryTask) PostExecute(ctx context.Context) error { toReduceResults := make([]*internalpb.RetrieveResults, 0) t.allQueryCnt = 0 + t.totalRelatedDataSize = 0 select { case <-t.TraceCtx().Done(): log.Warn("proxy", zap.Int64("Query: wait to finish failed, timeout!, msgID:", t.ID())) @@ -490,6 +486,7 @@ func (t *queryTask) PostExecute(ctx context.Context) error { t.resultBuf.Range(func(res *internalpb.RetrieveResults) bool { toReduceResults = append(toReduceResults, res) t.allQueryCnt += res.GetAllRetrieveCount() + t.totalRelatedDataSize += res.GetCostAggregation().GetTotalRelatedDataSize() log.Debug("proxy receives one query result", zap.Int64("sourceID", res.GetBase().GetSourceID())) return true }) diff --git a/internal/proxy/task_search.go b/internal/proxy/task_search.go index 269cf84f9d..8cbb59764f 100644 --- a/internal/proxy/task_search.go +++ b/internal/proxy/task_search.go @@ -73,6 +73,7 @@ type searchTask struct { lb LBPolicy queryChannelsTs map[string]Timestamp queryInfos []*planpb.QueryInfo + relatedDataSize int64 reScorers []reScorer rankParams *rankParams @@ -561,7 +562,9 @@ func (t *searchTask) PostExecute(ctx context.Context) error { } t.queryChannelsTs = make(map[string]uint64) + t.relatedDataSize = 0 for _, r := range toReduceResults { + t.relatedDataSize += r.GetCostAggregation().GetTotalRelatedDataSize() for ch, ts := range r.GetChannelsMvcc() { t.queryChannelsTs[ch] = ts } diff --git a/internal/querynodev2/segments/count_reducer.go b/internal/querynodev2/segments/count_reducer.go index 99134147b7..6c4f7142c0 100644 --- a/internal/querynodev2/segments/count_reducer.go +++ b/internal/querynodev2/segments/count_reducer.go @@ -13,8 +13,10 @@ type cntReducer struct{} func (r *cntReducer) Reduce(ctx context.Context, results []*internalpb.RetrieveResults) (*internalpb.RetrieveResults, error) { cnt := int64(0) allRetrieveCount := int64(0) + relatedDataSize := int64(0) for _, res := range results { allRetrieveCount += res.GetAllRetrieveCount() + relatedDataSize += res.GetCostAggregation().GetTotalRelatedDataSize() c, err := funcutil.CntOfInternalResult(res) if err != nil { return nil, err @@ -23,6 +25,9 @@ func (r *cntReducer) Reduce(ctx context.Context, results []*internalpb.RetrieveR } res := funcutil.WrapCntToInternalResult(cnt) res.AllRetrieveCount = allRetrieveCount + res.CostAggregation = &internalpb.CostAggregation{ + TotalRelatedDataSize: relatedDataSize, + } return res, nil } diff --git a/internal/querynodev2/segments/result.go b/internal/querynodev2/segments/result.go index ddb1b5fbbb..60df6c1554 100644 --- a/internal/querynodev2/segments/result.go +++ b/internal/querynodev2/segments/result.go @@ -98,6 +98,13 @@ func ReduceSearchResults(ctx context.Context, results []*internalpb.SearchResult return nil, false }) searchResults.CostAggregation = mergeRequestCost(requestCosts) + if searchResults.CostAggregation == nil { + searchResults.CostAggregation = &internalpb.CostAggregation{} + } + relatedDataSize := lo.Reduce(results, func(acc int64, result *internalpb.SearchResults, _ int) int64 { + return acc + result.GetCostAggregation().GetTotalRelatedDataSize() + }, 0) + searchResults.CostAggregation.TotalRelatedDataSize = relatedDataSize searchResults.ChannelsMvcc = channelsMvcc return searchResults, nil } @@ -108,17 +115,16 @@ func ReduceAdvancedSearchResults(ctx context.Context, results []*internalpb.Sear } channelsMvcc := make(map[string]uint64) - for _, r := range results { - for ch, ts := range r.GetChannelsMvcc() { - channelsMvcc[ch] = ts - } - } + relatedDataSize := int64(0) searchResults := &internalpb.SearchResults{ - IsAdvanced: true, - ChannelsMvcc: channelsMvcc, + IsAdvanced: true, } for _, result := range results { + relatedDataSize += result.GetCostAggregation().GetTotalRelatedDataSize() + for ch, ts := range result.GetChannelsMvcc() { + channelsMvcc[ch] = ts + } if !result.GetIsAdvanced() { continue } @@ -127,6 +133,7 @@ func ReduceAdvancedSearchResults(ctx context.Context, results []*internalpb.Sear searchResults.SubResults = append(searchResults.SubResults, result.GetSubResults()...) searchResults.NumQueries = result.GetNumQueries() } + searchResults.ChannelsMvcc = channelsMvcc requestCosts := lo.FilterMap(results, func(result *internalpb.SearchResults, _ int) (*internalpb.CostAggregation, bool) { if paramtable.Get().QueryNodeCfg.EnableWorkerSQCostMetrics.GetAsBool() { return result.GetCostAggregation(), true @@ -139,6 +146,10 @@ func ReduceAdvancedSearchResults(ctx context.Context, results []*internalpb.Sear return nil, false }) searchResults.CostAggregation = mergeRequestCost(requestCosts) + if searchResults.CostAggregation == nil { + searchResults.CostAggregation = &internalpb.CostAggregation{} + } + searchResults.CostAggregation.TotalRelatedDataSize = relatedDataSize return searchResults, nil } @@ -148,13 +159,12 @@ func MergeToAdvancedResults(ctx context.Context, results []*internalpb.SearchRes } channelsMvcc := make(map[string]uint64) - for _, r := range results { - for ch, ts := range r.GetChannelsMvcc() { + relatedDataSize := int64(0) + for index, result := range results { + relatedDataSize += result.GetCostAggregation().GetTotalRelatedDataSize() + for ch, ts := range result.GetChannelsMvcc() { channelsMvcc[ch] = ts } - } - searchResults.ChannelsMvcc = channelsMvcc - for index, result := range results { // we just append here, no need to split subResult and reduce // defer this reduce to proxy subResult := &internalpb.SubSearchResults{ @@ -169,6 +179,7 @@ func MergeToAdvancedResults(ctx context.Context, results []*internalpb.SearchRes searchResults.NumQueries = result.GetNumQueries() searchResults.SubResults = append(searchResults.SubResults, subResult) } + searchResults.ChannelsMvcc = channelsMvcc requestCosts := lo.FilterMap(results, func(result *internalpb.SearchResults, _ int) (*internalpb.CostAggregation, bool) { if paramtable.Get().QueryNodeCfg.EnableWorkerSQCostMetrics.GetAsBool() { return result.GetCostAggregation(), true @@ -181,6 +192,10 @@ func MergeToAdvancedResults(ctx context.Context, results []*internalpb.SearchRes return nil, false }) searchResults.CostAggregation = mergeRequestCost(requestCosts) + if searchResults.CostAggregation == nil { + searchResults.CostAggregation = &internalpb.CostAggregation{} + } + searchResults.CostAggregation.TotalRelatedDataSize = relatedDataSize return searchResults, nil } @@ -366,8 +381,10 @@ func MergeInternalRetrieveResult(ctx context.Context, retrieveResults []*interna ) validRetrieveResults := []*internalpb.RetrieveResults{} + relatedDataSize := int64(0) for _, r := range retrieveResults { ret.AllRetrieveCount += r.GetAllRetrieveCount() + relatedDataSize += r.GetCostAggregation().GetTotalRelatedDataSize() size := typeutil.GetSizeOfIDs(r.GetIds()) if r == nil || len(r.GetFieldsData()) == 0 || size == 0 { continue @@ -437,7 +454,10 @@ func MergeInternalRetrieveResult(ctx context.Context, retrieveResults []*interna return nil, false }) ret.CostAggregation = mergeRequestCost(requestCosts) - + if ret.CostAggregation == nil { + ret.CostAggregation = &internalpb.CostAggregation{} + } + ret.CostAggregation.TotalRelatedDataSize = relatedDataSize return ret, nil } diff --git a/internal/querynodev2/segments/retrieve.go b/internal/querynodev2/segments/retrieve.go index 71199c6696..53ba98e8b4 100644 --- a/internal/querynodev2/segments/retrieve.go +++ b/internal/querynodev2/segments/retrieve.go @@ -128,9 +128,12 @@ func retrieveOnSegmentsWithStream(ctx context.Context, segments []Segment, segTy if len(result.GetOffset()) != 0 { if err = svr.Send(&internalpb.RetrieveResults{ - Status: merr.Success(), - Ids: result.GetIds(), - FieldsData: result.GetFieldsData(), + Status: merr.Success(), + Ids: result.GetIds(), + FieldsData: result.GetFieldsData(), + CostAggregation: &internalpb.CostAggregation{ + TotalRelatedDataSize: segment.MemSize(), + }, AllRetrieveCount: result.GetAllRetrieveCount(), }); err != nil { errs[i] = err diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index 8ded559bf5..d782d89c15 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -959,10 +959,15 @@ func (node *QueryNode) Query(ctx context.Context, req *querypb.QueryRequest) (*i collector.Rate.Add(metricsinfo.NQPerSecond, 1) metrics.QueryNodeExecuteCounter.WithLabelValues(strconv.FormatInt(node.GetNodeID(), 10), metrics.QueryLabel).Add(float64(proto.Size(req))) } + relatedDataSize := lo.Reduce(toMergeResults, func(acc int64, result *internalpb.RetrieveResults, _ int) int64 { + return acc + result.GetCostAggregation().GetTotalRelatedDataSize() + }, 0) - if ret.GetCostAggregation() != nil { - ret.GetCostAggregation().ResponseTime = tr.ElapseSpan().Milliseconds() + if ret.CostAggregation == nil { + ret.CostAggregation = &internalpb.CostAggregation{} } + ret.CostAggregation.ResponseTime = tr.ElapseSpan().Milliseconds() + ret.CostAggregation.TotalRelatedDataSize = relatedDataSize return ret, nil } diff --git a/internal/querynodev2/tasks/query_task.go b/internal/querynodev2/tasks/query_task.go index ab08916308..3d51cb9379 100644 --- a/internal/querynodev2/tasks/query_task.go +++ b/internal/querynodev2/tasks/query_task.go @@ -6,6 +6,7 @@ import ( "strconv" "time" + "github.com/samber/lo" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/trace" @@ -129,6 +130,10 @@ func (t *QueryTask) Execute() error { return err } + relatedDataSize := lo.Reduce(querySegments, func(acc int64, seg segments.Segment, _ int) int64 { + return acc + seg.MemSize() + }, 0) + t.result = &internalpb.RetrieveResults{ Base: &commonpb.MsgBase{ SourceID: paramtable.GetNodeID(), @@ -137,7 +142,8 @@ func (t *QueryTask) Execute() error { Ids: reducedResult.Ids, FieldsData: reducedResult.FieldsData, CostAggregation: &internalpb.CostAggregation{ - ServiceTime: tr.ElapseSpan().Milliseconds(), + ServiceTime: tr.ElapseSpan().Milliseconds(), + TotalRelatedDataSize: relatedDataSize, }, AllRetrieveCount: reducedResult.GetAllRetrieveCount(), } diff --git a/internal/querynodev2/tasks/search_task.go b/internal/querynodev2/tasks/search_task.go index 00892bf582..2975bbf8a5 100644 --- a/internal/querynodev2/tasks/search_task.go +++ b/internal/querynodev2/tasks/search_task.go @@ -9,6 +9,7 @@ import ( "strconv" "github.com/golang/protobuf/proto" + "github.com/samber/lo" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" @@ -211,6 +212,10 @@ func (t *SearchTask) Execute() error { return nil } + relatedDataSize := lo.Reduce(searchedSegments, func(acc int64, seg segments.Segment, _ int) int64 { + return acc + seg.MemSize() + }, 0) + tr.RecordSpan() blobs, err := segments.ReduceSearchResultsAndFillData( t.ctx, @@ -259,7 +264,8 @@ func (t *SearchTask) Execute() error { SlicedOffset: 1, SlicedNumCount: 1, CostAggregation: &internalpb.CostAggregation{ - ServiceTime: tr.ElapseSpan().Milliseconds(), + ServiceTime: tr.ElapseSpan().Milliseconds(), + TotalRelatedDataSize: relatedDataSize, }, } } diff --git a/internal/util/hookutil/constant.go b/internal/util/hookutil/constant.go index bd4514ff17..18ba04da1d 100644 --- a/internal/util/hookutil/constant.go +++ b/internal/util/hookutil/constant.go @@ -21,15 +21,16 @@ package hookutil var ( // WARN: Please DO NOT modify all constants. - OpTypeKey = "op_type" - DatabaseKey = "database" - UsernameKey = "username" - DataSizeKey = "data_size" - SuccessCntKey = "success_cnt" - FailCntKey = "fail_cnt" - RelatedCntKey = "related_cnt" - NodeIDKey = "id" - DimensionKey = "dim" + OpTypeKey = "op_type" + DatabaseKey = "database" + UsernameKey = "username" + RequestDataSizeKey = "request_data_size" + ResultDataSizeKey = "result_data_size" + RelatedDataSizeKey = "related_data_size" + SuccessCntKey = "success_cnt" + FailCntKey = "fail_cnt" + RelatedCntKey = "related_cnt" + NodeIDKey = "id" OpTypeInsert = "insert" OpTypeDelete = "delete" diff --git a/pkg/metrics/proxy_metrics.go b/pkg/metrics/proxy_metrics.go index de1f25f551..0662795666 100644 --- a/pkg/metrics/proxy_metrics.go +++ b/pkg/metrics/proxy_metrics.go @@ -263,6 +263,15 @@ var ( Help: "count of bytes sent back to sdk", }, []string{nodeIDLabelName}) + // ProxyReportValue records value about the request + ProxyReportValue = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.ProxyRole, + Name: "report_value", + Help: "report value about the request", + }, []string{nodeIDLabelName, msgTypeLabelName, databaseLabelName, usernameLabelName}) + // ProxyLimiterRate records rates of rateLimiter in Proxy. ProxyLimiterRate = prometheus.NewGaugeVec( prometheus.GaugeOpts{ @@ -373,6 +382,7 @@ func RegisterProxy(registry *prometheus.Registry) { registry.MustRegister(ProxyRateLimitReqCount) registry.MustRegister(ProxySlowQueryCount) + registry.MustRegister(ProxyReportValue) } func CleanupProxyDBMetrics(nodeID int64, dbName string) { diff --git a/pkg/util/paramtable/runtime.go b/pkg/util/paramtable/runtime.go index 6ef2326065..7f8e594480 100644 --- a/pkg/util/paramtable/runtime.go +++ b/pkg/util/paramtable/runtime.go @@ -12,6 +12,7 @@ package paramtable import ( + "strconv" "sync" "time" ) @@ -59,6 +60,10 @@ func GetNodeID() UniqueID { return params.RuntimeConfig.NodeID.GetAsInt64() } +func GetStringNodeID() string { + return strconv.FormatInt(GetNodeID(), 10) +} + func SetRole(role string) { params.RuntimeConfig.Role.SetValue(role) } diff --git a/pkg/util/typeutil/get_dim.go b/pkg/util/typeutil/get_dim.go index cbe28b94cc..f65576f446 100644 --- a/pkg/util/typeutil/get_dim.go +++ b/pkg/util/typeutil/get_dim.go @@ -27,14 +27,3 @@ func GetDim(field *schemapb.FieldSchema) (int64, error) { } return int64(dim), nil } - -func GetCollectionDim(collection *schemapb.CollectionSchema) (int64, error) { - for _, fieldSchema := range collection.GetFields() { - dim, err := GetDim(fieldSchema) - if err != nil { - continue - } - return dim, nil - } - return 0, fmt.Errorf("dim not found") -} diff --git a/tests/integration/hellomilvus/hello_milvus_test.go b/tests/integration/hellomilvus/hello_milvus_test.go index c5ff94b37b..76b5edbc0e 100644 --- a/tests/integration/hellomilvus/hello_milvus_test.go +++ b/tests/integration/hellomilvus/hello_milvus_test.go @@ -91,7 +91,7 @@ func (s *HelloMilvusSuite) TestHelloMilvus() { reportInfo := report.(map[string]any) log.Info("insert report info", zap.Any("reportInfo", reportInfo)) s.Equal(hookutil.OpTypeInsert, reportInfo[hookutil.OpTypeKey]) - s.NotEqualValues(0, reportInfo[hookutil.DataSizeKey]) + s.NotEqualValues(0, reportInfo[hookutil.RequestDataSizeKey]) return } } @@ -177,7 +177,8 @@ func (s *HelloMilvusSuite) TestHelloMilvus() { reportInfo := report.(map[string]any) log.Info("search report info", zap.Any("reportInfo", reportInfo)) s.Equal(hookutil.OpTypeSearch, reportInfo[hookutil.OpTypeKey]) - s.NotEqualValues(0, reportInfo[hookutil.DataSizeKey]) + s.NotEqualValues(0, reportInfo[hookutil.ResultDataSizeKey]) + s.NotEqualValues(0, reportInfo[hookutil.RelatedDataSizeKey]) s.EqualValues(rowNum, reportInfo[hookutil.RelatedCntKey]) return } @@ -200,7 +201,8 @@ func (s *HelloMilvusSuite) TestHelloMilvus() { reportInfo := report.(map[string]any) log.Info("query report info", zap.Any("reportInfo", reportInfo)) s.Equal(hookutil.OpTypeQuery, reportInfo[hookutil.OpTypeKey]) - s.NotEqualValues(0, reportInfo[hookutil.DataSizeKey]) + s.NotEqualValues(0, reportInfo[hookutil.ResultDataSizeKey]) + s.NotEqualValues(0, reportInfo[hookutil.RelatedDataSizeKey]) s.EqualValues(rowNum, reportInfo[hookutil.RelatedCntKey]) return } diff --git a/tests/integration/partitionkey/partition_key_test.go b/tests/integration/partitionkey/partition_key_test.go index 3bdc098051..6f1fb2a9a6 100644 --- a/tests/integration/partitionkey/partition_key_test.go +++ b/tests/integration/partitionkey/partition_key_test.go @@ -198,7 +198,8 @@ func (s *PartitionKeySuite) TestPartitionKey() { reportInfo := report.(map[string]any) log.Info("search report info", zap.Any("reportInfo", reportInfo)) s.Equal(hookutil.OpTypeSearch, reportInfo[hookutil.OpTypeKey]) - s.NotEqualValues(0, reportInfo[hookutil.DataSizeKey]) + s.NotEqualValues(0, reportInfo[hookutil.ResultDataSizeKey]) + s.NotEqualValues(0, reportInfo[hookutil.RelatedDataSizeKey]) s.EqualValues(rowNum*3, reportInfo[hookutil.RelatedCntKey]) return } @@ -237,7 +238,8 @@ func (s *PartitionKeySuite) TestPartitionKey() { reportInfo := report.(map[string]any) log.Info("search report info", zap.Any("reportInfo", reportInfo)) s.Equal(hookutil.OpTypeSearch, reportInfo[hookutil.OpTypeKey]) - s.NotEqualValues(0, reportInfo[hookutil.DataSizeKey]) + s.NotEqualValues(0, reportInfo[hookutil.ResultDataSizeKey]) + s.NotEqualValues(0, reportInfo[hookutil.RelatedDataSizeKey]) s.EqualValues(rowNum, reportInfo[hookutil.RelatedCntKey]) return } @@ -267,7 +269,8 @@ func (s *PartitionKeySuite) TestPartitionKey() { reportInfo := report.(map[string]any) log.Info("query report info", zap.Any("reportInfo", reportInfo)) s.Equal(hookutil.OpTypeQuery, reportInfo[hookutil.OpTypeKey]) - s.NotEqualValues(0, reportInfo[hookutil.DataSizeKey]) + s.NotEqualValues(0, reportInfo[hookutil.ResultDataSizeKey]) + s.NotEqualValues(0, reportInfo[hookutil.RelatedDataSizeKey]) s.EqualValues(3*rowNum, reportInfo[hookutil.RelatedCntKey]) return } @@ -301,7 +304,8 @@ func (s *PartitionKeySuite) TestPartitionKey() { reportInfo := report.(map[string]any) log.Info("query report info", zap.Any("reportInfo", reportInfo)) s.Equal(hookutil.OpTypeQuery, reportInfo[hookutil.OpTypeKey]) - s.NotEqualValues(0, reportInfo[hookutil.DataSizeKey]) + s.NotEqualValues(0, reportInfo[hookutil.ResultDataSizeKey]) + s.NotEqualValues(0, reportInfo[hookutil.RelatedDataSizeKey]) s.EqualValues(rowNum, reportInfo[hookutil.RelatedCntKey]) return }