From 3a5aaeb7adcf1faef18d6bccd1f598337b6a4626 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Fri, 14 Oct 2022 18:05:24 +0800 Subject: [PATCH] Fix read queue metrics and memory protection (#19787) Signed-off-by: bigsheeper Signed-off-by: bigsheeper --- internal/datacoord/services.go | 6 --- internal/querycoordv2/services.go | 18 +++----- .../querynode/flow_graph_service_time_node.go | 1 + internal/querynode/task_query.go | 2 + internal/querynode/task_read.go | 1 - internal/querynode/task_search.go | 2 + internal/rootcoord/quota_center.go | 4 +- internal/rootcoord/quota_center_test.go | 44 ++++++++++++++++++- 8 files changed, 56 insertions(+), 22 deletions(-) diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index fbe898f1fc..286977ad03 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -846,10 +846,6 @@ func (s *Server) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest zap.String("metric_type", metricType)) if metricType == metricsinfo.SystemInfoMetrics { - ret, err := s.metricsCacheManager.GetSystemInfoMetrics() - if err == nil && ret != nil { - return ret, nil - } log.Debug("failed to get system info metrics from cache, recompute instead", zap.Error(err)) @@ -862,8 +858,6 @@ func (s *Server) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest zap.Any("metrics", metrics), // TODO(dragondriver): necessary? may be very large zap.Error(err)) - s.metricsCacheManager.UpdateSystemInfoMetrics(metrics) - return metrics, nil } diff --git a/internal/querycoordv2/services.go b/internal/querycoordv2/services.go index 07832f8a47..0f1233aba9 100644 --- a/internal/querycoordv2/services.go +++ b/internal/querycoordv2/services.go @@ -573,21 +573,15 @@ func (s *Server) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest return resp, nil } - metrics, err := s.metricsCacheManager.GetSystemInfoMetrics() + resp.Response, err = s.getSystemInfoMetrics(ctx, req) if err != nil { - log.Warn("failed to read metrics from cache, re-calculate it", zap.Error(err)) - metrics = resp - metrics.Response, err = s.getSystemInfoMetrics(ctx, req) - if err != nil { - msg := "failed to get system info metrics" - log.Warn(msg, zap.Error(err)) - resp.Status = utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg, err) - return resp, nil - } + msg := "failed to get system info metrics" + log.Warn(msg, zap.Error(err)) + resp.Status = utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg, err) + return resp, nil } - s.metricsCacheManager.UpdateSystemInfoMetrics(metrics) - return metrics, nil + return resp, nil } func (s *Server) GetReplicas(ctx context.Context, req *milvuspb.GetReplicasRequest) (*milvuspb.GetReplicasResponse, error) { diff --git a/internal/querynode/flow_graph_service_time_node.go b/internal/querynode/flow_graph_service_time_node.go index 9b263c5d87..f8b9ad31ae 100644 --- a/internal/querynode/flow_graph_service_time_node.go +++ b/internal/querynode/flow_graph_service_time_node.go @@ -64,6 +64,7 @@ func (stNode *serviceTimeNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { // should not happen, QueryNode should addTSafe before start flow graph panic(fmt.Errorf("serviceTimeNode setTSafe timeout, collectionID = %d, err = %s", stNode.collectionID, err)) } + rateCol.updateTSafe(stNode.vChannel, serviceTimeMsg.timeRange.timestampMax) p, _ := tsoutil.ParseTS(serviceTimeMsg.timeRange.timestampMax) log.RatedDebug(10.0, "update tSafe:", zap.Any("collectionID", stNode.collectionID), diff --git a/internal/querynode/task_query.go b/internal/querynode/task_query.go index b7bfbddfb3..d65b9ded18 100644 --- a/internal/querynode/task_query.go +++ b/internal/querynode/task_query.go @@ -47,6 +47,8 @@ func (q *queryTask) PreExecute(ctx context.Context) error { if !funcutil.CheckCtxValid(q.Ctx()) { return errors.New("search context timeout1$") } + q.SetStep(TaskStepPreExecute) + rateCol.rtCounter.increaseQueueTime(q) return nil } diff --git a/internal/querynode/task_read.go b/internal/querynode/task_read.go index d485ee8e12..1f399d4fea 100644 --- a/internal/querynode/task_read.go +++ b/internal/querynode/task_read.go @@ -78,7 +78,6 @@ func (b *baseReadTask) SetStep(step TaskStep) { b.tr.Record("enqueueStart") case TaskStepPreExecute: b.queueDur = b.tr.Record("enqueueEnd") - rateCol.rtCounter.increaseQueueTime(b) } } diff --git a/internal/querynode/task_search.go b/internal/querynode/task_search.go index 6c6b8e5f67..621d15a2cc 100644 --- a/internal/querynode/task_search.go +++ b/internal/querynode/task_search.go @@ -60,8 +60,10 @@ type searchTask struct { func (s *searchTask) PreExecute(ctx context.Context) error { s.SetStep(TaskStepPreExecute) + rateCol.rtCounter.increaseQueueTime(s) for _, t := range s.otherTasks { t.SetStep(TaskStepPreExecute) + rateCol.rtCounter.increaseQueueTime(t) } s.combinePlaceHolderGroups() return nil diff --git a/internal/rootcoord/quota_center.go b/internal/rootcoord/quota_center.go index 30d6df0c8c..60333bba88 100644 --- a/internal/rootcoord/quota_center.go +++ b/internal/rootcoord/quota_center.go @@ -541,7 +541,7 @@ func (q *QuotaCenter) memoryToWaterLevel() float64 { zap.Float64("QueryNodeMemoryHighWaterLevel", queryNodeMemoryHighWaterLevel)) return 0 } - p := (memoryWaterLevel - queryNodeMemoryLowWaterLevel) / (queryNodeMemoryHighWaterLevel - queryNodeMemoryLowWaterLevel) + p := (queryNodeMemoryHighWaterLevel - memoryWaterLevel) / (queryNodeMemoryHighWaterLevel - queryNodeMemoryLowWaterLevel) if p < factor { factor = p } @@ -558,7 +558,7 @@ func (q *QuotaCenter) memoryToWaterLevel() float64 { zap.Float64("DataNodeMemoryHighWaterLevel", dataNodeMemoryHighWaterLevel)) return 0 } - p := (memoryWaterLevel - dataNodeMemoryLowWaterLevel) / (dataNodeMemoryHighWaterLevel - dataNodeMemoryLowWaterLevel) + p := (dataNodeMemoryHighWaterLevel - memoryWaterLevel) / (dataNodeMemoryHighWaterLevel - dataNodeMemoryLowWaterLevel) if p < factor { factor = p } diff --git a/internal/rootcoord/quota_center_test.go b/internal/rootcoord/quota_center_test.go index bd9c4b7063..a42c1ba863 100644 --- a/internal/rootcoord/quota_center_test.go +++ b/internal/rootcoord/quota_center_test.go @@ -344,7 +344,7 @@ func TestQuotaCenter(t *testing.T) { Params.QuotaConfig.ForceDenyWriting = forceBak }) - t.Run("test memoryToWaterLevel", func(t *testing.T) { + t.Run("test memoryToWaterLevel basic", func(t *testing.T) { quotaCenter := NewQuotaCenter(pcm, &queryCoordMockForQuota{}, &dataCoordMockForQuota{}, core.tsoAllocator) factor := quotaCenter.memoryToWaterLevel() assert.Equal(t, float64(1), factor) @@ -356,6 +356,48 @@ func TestQuotaCenter(t *testing.T) { assert.Equal(t, float64(0), factor) }) + t.Run("test memoryToWaterLevel factors", func(t *testing.T) { + quotaCenter := NewQuotaCenter(pcm, &queryCoordMockForQuota{}, &dataCoordMockForQuota{}, core.tsoAllocator) + type memCase struct { + lowWater float64 + highWater float64 + memUsage uint64 + memTotal uint64 + expectedFactor float64 + } + memCases := []memCase{ + {0.8, 0.9, 80, 100, 1}, + {0.8, 0.9, 82, 100, 0.8}, + {0.8, 0.9, 85, 100, 0.5}, + {0.8, 0.9, 88, 100, 0.2}, + {0.8, 0.9, 90, 100, 0}, + + {0.85, 0.95, 85, 100, 1}, + {0.85, 0.95, 87, 100, 0.8}, + {0.85, 0.95, 90, 100, 0.5}, + {0.85, 0.95, 93, 100, 0.2}, + {0.85, 0.95, 95, 100, 0}, + } + + lowBackup := Params.QuotaConfig.DataNodeMemoryLowWaterLevel + highBackup := Params.QuotaConfig.DataNodeMemoryHighWaterLevel + + for i, c := range memCases { + Params.QuotaConfig.QueryNodeMemoryLowWaterLevel = c.lowWater + Params.QuotaConfig.QueryNodeMemoryHighWaterLevel = c.highWater + quotaCenter.queryNodeMetrics = []*metricsinfo.QueryNodeQuotaMetrics{{ + Hms: metricsinfo.HardwareMetrics{MemoryUsage: c.memUsage, Memory: c.memTotal}}} + factor := quotaCenter.memoryToWaterLevel() + if math.Abs(factor-c.expectedFactor) > 0.000001 { + t.Errorf("case %d failed: waterLever[low:%v, high:%v], memMetric[used:%d, total:%d], expectedFactor: %f, actualFactor: %f", + i, c.lowWater, c.highWater, c.memUsage, c.memTotal, c.expectedFactor, factor) + } + } + + Params.QuotaConfig.QueryNodeMemoryLowWaterLevel = lowBackup + Params.QuotaConfig.QueryNodeMemoryHighWaterLevel = highBackup + }) + t.Run("test diskQuotaExceeded", func(t *testing.T) { quotaCenter := NewQuotaCenter(pcm, &queryCoordMockForQuota{}, &dataCoordMockForQuota{}, core.tsoAllocator)