diff --git a/internal/datacoord/metrics_info.go b/internal/datacoord/metrics_info.go index 83ad1b9b39..ab10aa1b23 100644 --- a/internal/datacoord/metrics_info.go +++ b/internal/datacoord/metrics_info.go @@ -76,8 +76,6 @@ func (s *Server) getSystemInfoMetrics( } // for each data node, fetch metrics info - log.Debug("datacoord.getSystemInfoMetrics", - zap.Int("DataNodes number", len(nodes))) for _, node := range nodes { infos, err := s.getDataNodeMetrics(ctx, req, node) if err != nil { diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 9b4fca3aee..7e2c4bb079 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -811,10 +811,6 @@ func (s *Server) ShowConfigurations(ctx context.Context, req *internalpb.ShowCon // GetMetrics returns DataCoord metrics info // it may include SystemMetrics, Topology metrics, etc. func (s *Server) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { - log.Debug("received get metrics request", - zap.Int64("nodeID", Params.DataCoordCfg.GetNodeID()), - zap.String("request", req.Request)) - if s.isClosed() { log.Warn("DataCoord.GetMetrics failed", zap.Int64("node_id", Params.DataCoordCfg.GetNodeID()), @@ -848,14 +844,17 @@ func (s *Server) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest }, nil } - log.Debug("DataCoord.GetMetrics", - zap.String("metric_type", metricType)) - if metricType == metricsinfo.SystemInfoMetrics { - log.Debug("failed to get system info metrics from cache, recompute instead", - zap.Error(err)) - metrics, err := s.getSystemInfoMetrics(ctx, req) + if err != nil { + log.Warn("DataCoord GetMetrics failed", zap.Int64("nodeID", Params.DataCoordCfg.GetNodeID()), zap.Error(err)) + return &milvuspb.GetMetricsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: err.Error(), + }, + }, nil + } log.Debug("DataCoord.GetMetrics", zap.Int64("node_id", Params.DataCoordCfg.GetNodeID()), diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 82c29d0934..5c0f0fee1e 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -747,12 +747,7 @@ func (node *DataNode) ShowConfigurations(ctx context.Context, req *internalpb.Sh } // GetMetrics return datanode metrics -// TODO(dragondriver): cache the Metrics and set a retention to the cache func (node *DataNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { - log.Debug("DataNode.GetMetrics", - zap.Int64("node_id", Params.DataNodeCfg.GetNodeID()), - zap.String("req", req.Request)) - if !node.isHealthy() { log.Warn("DataNode.GetMetrics failed", zap.Int64("node_id", Params.DataNodeCfg.GetNodeID()), @@ -782,11 +777,17 @@ func (node *DataNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRe }, nil } - log.Debug("DataNode.GetMetrics", - zap.String("metric_type", metricType)) - if metricType == metricsinfo.SystemInfoMetrics { systemInfoMetrics, err := node.getSystemInfoMetrics(ctx, req) + if err != nil { + log.Warn("DataNode GetMetrics failed", zap.Int64("nodeID", Params.DataNodeCfg.GetNodeID()), zap.Error(err)) + return &milvuspb.GetMetricsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: err.Error(), + }, + }, nil + } log.Debug("DataNode.GetMetrics", zap.Int64("node_id", Params.DataNodeCfg.GetNodeID()), diff --git a/internal/datanode/metrics_info.go b/internal/datanode/metrics_info.go index 277cb1f9a0..065d6a6ef7 100644 --- a/internal/datanode/metrics_info.go +++ b/internal/datanode/metrics_info.go @@ -48,13 +48,14 @@ func (node *DataNode) getQuotaMetrics() (*metricsinfo.DataNodeQuotaMetrics, erro if err != nil { return nil, err } - + minFGChannel, minFGTt := rateCol.getMinFlowGraphTt() return &metricsinfo.DataNodeQuotaMetrics{ Hms: metricsinfo.HardwareMetrics{}, Rms: rms, Fgm: metricsinfo.FlowGraphMetric{ - MinFlowGraphTt: rateCol.getMinFlowGraphTt(), - NumFlowGraph: node.flowgraphManager.getFlowGraphNum(), + MinFlowGraphChannel: minFGChannel, + MinFlowGraphTt: minFGTt, + NumFlowGraph: node.flowgraphManager.getFlowGraphNum(), }, }, nil } diff --git a/internal/datanode/rate_collector.go b/internal/datanode/rate_collector.go index a601efb6e1..1131db5ae9 100644 --- a/internal/datanode/rate_collector.go +++ b/internal/datanode/rate_collector.go @@ -57,15 +57,17 @@ func (r *rateCollector) removeFlowGraphChannel(channel string) { delete(r.flowGraphTt, channel) } -// getMinFlowGraphTt returns the minimal time tick of flow graphs. -func (r *rateCollector) getMinFlowGraphTt() Timestamp { +// getMinFlowGraphTt returns the vchannel and minimal time tick of flow graphs. +func (r *rateCollector) getMinFlowGraphTt() (string, Timestamp) { r.flowGraphTtMu.Lock() defer r.flowGraphTtMu.Unlock() minTt := typeutil.MaxTimestamp - for _, t := range r.flowGraphTt { + var channel string + for c, t := range r.flowGraphTt { if minTt > t { minTt = t + channel = c } } - return minTt + return channel, minTt } diff --git a/internal/datanode/rate_collector_test.go b/internal/datanode/rate_collector_test.go index 178c04cef0..67776071af 100644 --- a/internal/datanode/rate_collector_test.go +++ b/internal/datanode/rate_collector_test.go @@ -29,12 +29,14 @@ func TestRateCollector(t *testing.T) { collector, err := newRateCollector() assert.NoError(t, err) - minTt := collector.getMinFlowGraphTt() + c, minTt := collector.getMinFlowGraphTt() + assert.Equal(t, "", c) assert.Equal(t, typeutil.MaxTimestamp, minTt) collector.updateFlowGraphTt("channel1", 100) collector.updateFlowGraphTt("channel2", 200) collector.updateFlowGraphTt("channel3", 50) - minTt = collector.getMinFlowGraphTt() + c, minTt = collector.getMinFlowGraphTt() + assert.Equal(t, "channel3", c) assert.Equal(t, Timestamp(50), minTt) }) } diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index 9cf4ea32b0..31c39aa0d1 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -3734,10 +3734,6 @@ func (node *Proxy) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsReque // GetProxyMetrics gets the metrics of proxy, it's an internal interface which is different from GetMetrics interface, // because it only obtains the metrics of Proxy, not including the topological metrics of Query cluster and Data cluster. func (node *Proxy) GetProxyMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { - log.Debug("Proxy.GetProxyMetrics", - zap.Int64("node_id", Params.ProxyCfg.GetNodeID()), - zap.String("req", req.Request)) - if !node.checkHealthy() { log.Warn("Proxy.GetProxyMetrics failed", zap.Int64("node_id", Params.ProxyCfg.GetNodeID()), @@ -3767,9 +3763,6 @@ func (node *Proxy) GetProxyMetrics(ctx context.Context, req *milvuspb.GetMetrics }, nil } - log.Debug("Proxy.GetProxyMetrics", - zap.String("metric_type", metricType)) - req.Base = commonpbutil.NewMsgBase( commonpbutil.WithMsgType(commonpb.MsgType_SystemInfo), commonpbutil.WithMsgID(0), @@ -3796,8 +3789,7 @@ func (node *Proxy) GetProxyMetrics(ctx context.Context, req *milvuspb.GetMetrics log.Debug("Proxy.GetProxyMetrics", zap.Int64("node_id", Params.ProxyCfg.GetNodeID()), zap.String("req", req.Request), - zap.String("metric_type", metricType), - zap.Error(err)) + zap.String("metric_type", metricType)) return proxyMetrics, nil } @@ -4574,7 +4566,6 @@ func (node *Proxy) RefreshPolicyInfoCache(ctx context.Context, req *proxypb.Refr // SetRates limits the rates of requests. func (node *Proxy) SetRates(ctx context.Context, request *proxypb.SetRatesRequest) (*commonpb.Status, error) { - log.Debug("SetRates", zap.String("role", typeutil.ProxyRole), zap.Any("rates", request.GetRates())) resp := &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, } diff --git a/internal/querycoordv2/services.go b/internal/querycoordv2/services.go index d47d8aca22..1e84057d85 100644 --- a/internal/querycoordv2/services.go +++ b/internal/querycoordv2/services.go @@ -547,7 +547,7 @@ func (s *Server) ShowConfigurations(ctx context.Context, req *internalpb.ShowCon func (s *Server) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { log := log.With(zap.Int64("msgID", req.Base.GetMsgID())) - log.Info("get metrics request received", + log.Debug("get metrics request received", zap.String("metricType", req.GetRequest())) if s.status.Load() != commonpb.StateCode_Healthy { diff --git a/internal/querynode/impl.go b/internal/querynode/impl.go index 2eb85aabd6..114d3f32d6 100644 --- a/internal/querynode/impl.go +++ b/internal/querynode/impl.go @@ -1122,7 +1122,6 @@ func (node *QueryNode) ShowConfigurations(ctx context.Context, req *internalpb.S } // GetMetrics return system infos of the query node, such as total memory, memory usage, cpu usage ... -// TODO(dragondriver): cache the Metrics and set a retention to the cache func (node *QueryNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { if !node.isHealthy() { log.Warn("QueryNode.GetMetrics failed", @@ -1151,21 +1150,31 @@ func (node *QueryNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsR ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: err.Error(), }, - Response: "", }, nil } if metricType == metricsinfo.SystemInfoMetrics { - metrics, err := getSystemInfoMetrics(ctx, req, node) + queryNodeMetrics, err := getSystemInfoMetrics(ctx, req, node) if err != nil { log.Warn("QueryNode.GetMetrics failed", zap.Int64("nodeId", Params.QueryNodeCfg.GetNodeID()), zap.String("req", req.Request), zap.String("metricType", metricType), zap.Error(err)) + return &milvuspb.GetMetricsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: err.Error(), + }, + }, nil } + log.Debug("QueryNode.GetMetrics", + zap.Int64("node_id", Params.QueryNodeCfg.GetNodeID()), + zap.String("req", req.Request), + zap.String("metric_type", metricType), + zap.Any("queryNodeMetrics", queryNodeMetrics)) - return metrics, nil + return queryNodeMetrics, nil } log.Debug("QueryNode.GetMetrics failed, request metric type is not implemented yet", diff --git a/internal/querynode/metrics_info.go b/internal/querynode/metrics_info.go index a5de20641b..ca7f1b177a 100644 --- a/internal/querynode/metrics_info.go +++ b/internal/querynode/metrics_info.go @@ -72,13 +72,15 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error if err != nil { return nil, err } + minFGChannel, minFGTt := rateCol.getMinTSafe() defer rateCol.rtCounter.resetQueueTime() return &metricsinfo.QueryNodeQuotaMetrics{ Hms: metricsinfo.HardwareMetrics{}, Rms: rms, Fgm: metricsinfo.FlowGraphMetric{ - MinFlowGraphTt: rateCol.getMinTSafe(), - NumFlowGraph: node.dataSyncService.getFlowGraphNum(), + MinFlowGraphChannel: minFGChannel, + MinFlowGraphTt: minFGTt, + NumFlowGraph: node.dataSyncService.getFlowGraphNum(), }, SearchQueue: rateCol.rtCounter.getSearchNQInQueue(), QueryQueue: rateCol.rtCounter.getQueryTasksInQueue(), diff --git a/internal/querynode/rate_collector.go b/internal/querynode/rate_collector.go index 51ef282114..6004a4ffa0 100644 --- a/internal/querynode/rate_collector.go +++ b/internal/querynode/rate_collector.go @@ -60,15 +60,17 @@ func (r *rateCollector) removeTSafeChannel(c string) { delete(r.tSafes, c) } -// getMinTSafe returns the minimal tSafe of flow graphs. -func (r *rateCollector) getMinTSafe() Timestamp { +// getMinTSafe returns the vchannel and minimal tSafe of flow graphs. +func (r *rateCollector) getMinTSafe() (Channel, Timestamp) { r.tSafesMu.Lock() defer r.tSafesMu.Unlock() + var channel Channel minTt := typeutil.MaxTimestamp - for _, t := range r.tSafes { + for c, t := range r.tSafes { if minTt > t { minTt = t + channel = c } } - return minTt + return channel, minTt } diff --git a/internal/querynode/rate_collector_test.go b/internal/querynode/rate_collector_test.go index f9949cfbbc..7dc3c150b5 100644 --- a/internal/querynode/rate_collector_test.go +++ b/internal/querynode/rate_collector_test.go @@ -29,12 +29,14 @@ func TestRateCollector(t *testing.T) { collector, err := newRateCollector() assert.NoError(t, err) - minTt := collector.getMinTSafe() + c, minTt := collector.getMinTSafe() + assert.Equal(t, "", c) assert.Equal(t, typeutil.MaxTimestamp, minTt) collector.updateTSafe("channel1", 100) collector.updateTSafe("channel2", 200) collector.updateTSafe("channel3", 50) - minTt = collector.getMinTSafe() + c, minTt = collector.getMinTSafe() + assert.Equal(t, "channel3", c) assert.Equal(t, Timestamp(50), minTt) }) } diff --git a/internal/rootcoord/quota_center.go b/internal/rootcoord/quota_center.go index 984761efbf..20f303da31 100644 --- a/internal/rootcoord/quota_center.go +++ b/internal/rootcoord/quota_center.go @@ -37,6 +37,7 @@ import ( "github.com/milvus-io/milvus/internal/util/metricsinfo" "github.com/milvus-io/milvus/internal/util/ratelimitutil" "github.com/milvus-io/milvus/internal/util/tsoutil" + "github.com/milvus-io/milvus/internal/util/typeutil" ) const ( @@ -77,9 +78,9 @@ type Limit = ratelimitutil.Limit // Protections: // 1. TT protection -> dqlRate = maxDQLRate * (maxDelay - ttDelay) / maxDelay // 2. Memory protection -> dmlRate = maxDMLRate * (highMem - curMem) / (highMem - lowMem) -// 3. Disk quota protection -> force deny writing if exceeded -// 4. DQL Queue length protection -> dqlRate = curDQLRate * CoolOffSpeed -// 5. DQL queue latency protection -> dqlRate = curDQLRate * CoolOffSpeed +// 3. Disk quota protection -> force deny writing if exceeded +// 4. DQL Queue length protection -> dqlRate = curDQLRate * CoolOffSpeed +// 5. DQL queue latency protection -> dqlRate = curDQLRate * CoolOffSpeed // 6. Search result protection -> searchRate = curSearchRate * CoolOffSpeed // // If necessary, user can also manually force to deny RW requests. @@ -131,17 +132,17 @@ func (q *QuotaCenter) run() { case <-ticker.C: err := q.syncMetrics() if err != nil { - log.Error("quotaCenter sync metrics failed", zap.Error(err)) + log.Warn("quotaCenter sync metrics failed", zap.Error(err)) break } err = q.calculateRates() if err != nil { - log.Error("quotaCenter calculate rates failed", zap.Error(err)) + log.Warn("quotaCenter calculate rates failed", zap.Error(err)) break } err = q.setRates() if err != nil { - log.Error("quotaCenter setRates failed", zap.Error(err)) + log.Warn("quotaCenter setRates failed", zap.Error(err)) } } } @@ -241,12 +242,11 @@ func (q *QuotaCenter) syncMetrics() error { if err != nil { return err } - // TODO: use rated log - log.Debug("QuotaCenter sync metrics done", - zap.Any("dataNodeMetrics", q.dataNodeMetrics), - zap.Any("queryNodeMetrics", q.queryNodeMetrics), - zap.Any("proxyMetrics", q.proxyMetrics), - zap.Any("dataCoordMetrics", q.dataCoordMetrics)) + //log.Debug("QuotaCenter sync metrics done", + // zap.Any("dataNodeMetrics", q.dataNodeMetrics), + // zap.Any("queryNodeMetrics", q.queryNodeMetrics), + // zap.Any("proxyMetrics", q.proxyMetrics), + // zap.Any("dataCoordMetrics", q.dataCoordMetrics)) return nil } @@ -294,14 +294,18 @@ func (q *QuotaCenter) calculateReadRates() { coolOffSpeed := Params.QuotaConfig.CoolOffSpeed coolOff := func(realTimeSearchRate float64, realTimeQueryRate float64) { - if q.currentRates[internalpb.RateType_DQLSearch] != Inf { + if q.currentRates[internalpb.RateType_DQLSearch] != Inf && realTimeSearchRate > 0 { q.currentRates[internalpb.RateType_DQLSearch] = Limit(realTimeSearchRate * coolOffSpeed) } - if q.currentRates[internalpb.RateType_DQLQuery] != Inf { + if q.currentRates[internalpb.RateType_DQLQuery] != Inf && realTimeSearchRate > 0 { q.currentRates[internalpb.RateType_DQLQuery] = Limit(realTimeQueryRate * coolOffSpeed) } q.guaranteeMinRate(Params.QuotaConfig.DQLMinSearchRate, internalpb.RateType_DQLSearch) q.guaranteeMinRate(Params.QuotaConfig.DQLMinQueryRate, internalpb.RateType_DQLQuery) + log.Warn("QuotaCenter cool read rates off done", + zap.Any("searchRate", q.currentRates[internalpb.RateType_DQLSearch]), + zap.Any("queryRate", q.currentRates[internalpb.RateType_DQLQuery])) + log.Info("QueryNodeMetrics when cool-off", zap.Any("metrics", q.queryNodeMetrics)) } // TODO: unify search and query? @@ -309,21 +313,18 @@ func (q *QuotaCenter) calculateReadRates() { realTimeQueryRate := q.getRealTimeRate(internalpb.RateType_DQLQuery) queueLatencyFactor := q.getQueryLatencyFactor() - log.Debug("QuotaCenter getQueryLatencyFactor done", zap.Float64("queueLatencyFactor", queueLatencyFactor)) if Limit(queueLatencyFactor) == Limit(coolOffSpeed) { coolOff(realTimeSearchRate, realTimeQueryRate) return } queueLengthFactor := q.getNQInQueryFactor() - log.Debug("QuotaCenter getNQInQueryFactor done", zap.Float64("queueLengthFactor", queueLengthFactor)) if Limit(queueLengthFactor) == Limit(coolOffSpeed) { coolOff(realTimeSearchRate, realTimeQueryRate) return } resultRateFactor := q.getReadResultFactor() - log.Debug("QuotaCenter getReadResultFactor done", zap.Float64("resultRateFactor", resultRateFactor)) if Limit(resultRateFactor) == Limit(coolOffSpeed) { coolOff(realTimeSearchRate, realTimeQueryRate) } @@ -341,7 +342,6 @@ func (q *QuotaCenter) calculateWriteRates() error { q.forceDenyWriting(DiskQuotaExceeded) // disk quota protection return nil } - log.Debug("QuotaCenter check diskQuota done", zap.Bool("exceeded", exceeded)) ts, err := q.tsoAllocator.GenerateTSO(1) if err != nil { @@ -352,14 +352,12 @@ func (q *QuotaCenter) calculateWriteRates() error { q.forceDenyWriting(TimeTickLongDelay) // tt protection return nil } - log.Debug("QuotaCenter check getTimeTickDelayFactor done", zap.Float64("ttFactor", ttFactor)) memFactor := q.getMemoryFactor() if memFactor <= 0 { q.forceDenyWriting(MemoryExhausted) // memory protection return nil } - log.Debug("QuotaCenter check memoryWaterLevel done", zap.Float64("memFactor", memFactor)) if memFactor < ttFactor { ttFactor = memFactor @@ -386,7 +384,7 @@ func (q *QuotaCenter) calculateRates() error { } q.calculateReadRates() - log.Debug("QuotaCenter calculates rate done", zap.Any("rates", q.currentRates)) + // log.Debug("QuotaCenter calculates rate done", zap.Any("rates", q.currentRates)) return nil } @@ -415,27 +413,35 @@ func (q *QuotaCenter) resetCurrentRates() { // getTimeTickDelayFactor gets time tick delay of DataNodes and QueryNodes, // and return the factor according to max tolerable time tick delay. func (q *QuotaCenter) getTimeTickDelayFactor(ts Timestamp) float64 { - t1, _ := tsoutil.ParseTS(ts) + var curMaxDelay time.Duration + var role, vchannel string + var minTt time.Time - var maxDelay time.Duration + t1, _ := tsoutil.ParseTS(ts) for nodeID, metric := range q.queryNodeMetrics { if metric.Fgm.NumFlowGraph > 0 { t2, _ := tsoutil.ParseTS(metric.Fgm.MinFlowGraphTt) delay := t1.Sub(t2) - if delay.Nanoseconds() > maxDelay.Nanoseconds() { - maxDelay = delay + if delay.Nanoseconds() > curMaxDelay.Nanoseconds() { + curMaxDelay = delay + role = fmt.Sprintf("%s-%d", typeutil.QueryNodeRole, nodeID) + vchannel = metric.Fgm.MinFlowGraphChannel + minTt = t2 } - metrics.RootCoordTtDelay.WithLabelValues(strconv.FormatInt(nodeID, 10)).Set(float64(maxDelay.Milliseconds())) + metrics.RootCoordTtDelay.WithLabelValues(strconv.FormatInt(nodeID, 10)).Set(float64(curMaxDelay.Milliseconds())) } } for nodeID, metric := range q.dataNodeMetrics { if metric.Fgm.NumFlowGraph > 0 { t2, _ := tsoutil.ParseTS(metric.Fgm.MinFlowGraphTt) delay := t1.Sub(t2) - if delay.Nanoseconds() > maxDelay.Nanoseconds() { - maxDelay = delay + if delay.Nanoseconds() > curMaxDelay.Nanoseconds() { + curMaxDelay = delay + role = fmt.Sprintf("%s-%d", typeutil.DataNodeRole, nodeID) + vchannel = metric.Fgm.MinFlowGraphChannel + minTt = t2 } - metrics.RootCoordTtDelay.WithLabelValues(strconv.FormatInt(nodeID, 10)).Set(float64(maxDelay.Milliseconds())) + metrics.RootCoordTtDelay.WithLabelValues(strconv.FormatInt(nodeID, 10)).Set(float64(curMaxDelay.Milliseconds())) } } @@ -443,17 +449,26 @@ func (q *QuotaCenter) getTimeTickDelayFactor(ts Timestamp) float64 { return 1 } - maxTt := Params.QuotaConfig.MaxTimeTickDelay - if maxTt < 0 { + maxDelay := Params.QuotaConfig.MaxTimeTickDelay + if maxDelay < 0 { // < 0 means disable tt protection return 1 } - log.Debug("QuotaCenter check timeTick delay", zap.Time("curTs", t1), zap.Duration("maxDelay", maxDelay)) - if maxDelay.Nanoseconds() >= maxTt.Nanoseconds() { + if curMaxDelay.Nanoseconds() >= maxDelay.Nanoseconds() { + log.Warn("QuotaCenter force deny writing due to long timeTick delay", + zap.String("node", role), + zap.String("vchannel", vchannel), + zap.Time("curTs", t1), + zap.Time("minTs", minTt), + zap.Duration("delay", curMaxDelay), + zap.Duration("MaxDelay", maxDelay)) + log.Info("DataNode and QueryNode Metrics", + zap.Any("QueryNodeMetrics", q.queryNodeMetrics), + zap.Any("DataNodeMetrics", q.dataNodeMetrics)) return 0 } - return float64(maxTt.Nanoseconds()-maxDelay.Nanoseconds()) / float64(maxTt.Nanoseconds()) + return float64(maxDelay.Nanoseconds()-curMaxDelay.Nanoseconds()) / float64(maxDelay.Nanoseconds()) } // getNQInQueryFactor checks search&query nq in QueryNode, @@ -540,37 +555,53 @@ func (q *QuotaCenter) getMemoryFactor() float64 { queryNodeMemoryLowWaterLevel := Params.QuotaConfig.QueryNodeMemoryLowWaterLevel queryNodeMemoryHighWaterLevel := Params.QuotaConfig.QueryNodeMemoryHighWaterLevel - for _, metric := range q.queryNodeMetrics { + for nodeID, metric := range q.queryNodeMetrics { memoryWaterLevel := float64(metric.Hms.MemoryUsage) / float64(metric.Hms.Memory) if memoryWaterLevel <= queryNodeMemoryLowWaterLevel { continue } if memoryWaterLevel >= queryNodeMemoryHighWaterLevel { - log.Debug("QuotaCenter: QueryNode memory to high water level", + log.Warn("QuotaCenter: QueryNode memory to high water level", + zap.String("Node", fmt.Sprintf("%s-%d", typeutil.QueryNodeRole, nodeID)), zap.Uint64("UsedMem", metric.Hms.MemoryUsage), zap.Uint64("TotalMem", metric.Hms.Memory), - zap.Float64("QueryNodeMemoryHighWaterLevel", queryNodeMemoryHighWaterLevel)) + zap.Float64("memoryWaterLevel", memoryWaterLevel), + zap.Float64("memoryHighWaterLevel", queryNodeMemoryHighWaterLevel)) return 0 } p := (queryNodeMemoryHighWaterLevel - memoryWaterLevel) / (queryNodeMemoryHighWaterLevel - queryNodeMemoryLowWaterLevel) if p < factor { + log.Warn("QuotaCenter: QueryNode memory to low water level, limit writing rate", + zap.String("Node", fmt.Sprintf("%s-%d", typeutil.QueryNodeRole, nodeID)), + zap.Uint64("UsedMem", metric.Hms.MemoryUsage), + zap.Uint64("TotalMem", metric.Hms.Memory), + zap.Float64("memoryWaterLevel", memoryWaterLevel), + zap.Float64("memoryLowWaterLevel", queryNodeMemoryLowWaterLevel)) factor = p } } - for _, metric := range q.dataNodeMetrics { + for nodeID, metric := range q.dataNodeMetrics { memoryWaterLevel := float64(metric.Hms.MemoryUsage) / float64(metric.Hms.Memory) if memoryWaterLevel <= dataNodeMemoryLowWaterLevel { continue } if memoryWaterLevel >= dataNodeMemoryHighWaterLevel { - log.Debug("QuotaCenter: DataNode memory to high water level", + log.Warn("QuotaCenter: DataNode memory to high water level", + zap.String("Node", fmt.Sprintf("%s-%d", typeutil.DataNodeRole, nodeID)), zap.Uint64("UsedMem", metric.Hms.MemoryUsage), zap.Uint64("TotalMem", metric.Hms.Memory), - zap.Float64("DataNodeMemoryHighWaterLevel", dataNodeMemoryHighWaterLevel)) + zap.Float64("memoryWaterLevel", memoryWaterLevel), + zap.Float64("memoryHighWaterLevel", dataNodeMemoryHighWaterLevel)) return 0 } p := (dataNodeMemoryHighWaterLevel - memoryWaterLevel) / (dataNodeMemoryHighWaterLevel - dataNodeMemoryLowWaterLevel) if p < factor { + log.Warn("QuotaCenter: DataNode memory to low water level, limit writing rate", + zap.String("Node", fmt.Sprintf("%s-%d", typeutil.DataNodeRole, nodeID)), + zap.Uint64("UsedMem", metric.Hms.MemoryUsage), + zap.Uint64("TotalMem", metric.Hms.Memory), + zap.Float64("memoryWaterLevel", memoryWaterLevel), + zap.Float64("memoryLowWaterLevel", dataNodeMemoryLowWaterLevel)) factor = p } } @@ -587,7 +618,15 @@ func (q *QuotaCenter) ifDiskQuotaExceeded() bool { } diskQuota := Params.QuotaConfig.DiskQuota totalSize := q.dataCoordMetrics.TotalBinlogSize - return float64(totalSize) >= diskQuota + if float64(totalSize) >= diskQuota { + log.Warn("QuotaCenter: disk quota exceeded", + zap.Int64("curDiskUsage", totalSize), + zap.Float64("diskQuota", diskQuota)) + log.Info("DataCoordMetric", + zap.Any("metric", q.dataCoordMetrics)) + return true + } + return false } // setRates notifies Proxies to set rates for different rate types. diff --git a/internal/rootcoord/quota_center_test.go b/internal/rootcoord/quota_center_test.go index 9109496ef7..ae93ae13f2 100644 --- a/internal/rootcoord/quota_center_test.go +++ b/internal/rootcoord/quota_center_test.go @@ -386,12 +386,14 @@ func TestQuotaCenter(t *testing.T) { expectedFactor float64 } memCases := []memCase{ + {0.8, 0.9, 10, 100, 1}, {0.8, 0.9, 80, 100, 1}, {0.8, 0.9, 82, 100, 0.8}, {0.8, 0.9, 85, 100, 0.5}, {0.8, 0.9, 88, 100, 0.2}, {0.8, 0.9, 90, 100, 0}, + {0.85, 0.95, 25, 100, 1}, {0.85, 0.95, 85, 100, 1}, {0.85, 0.95, 87, 100, 0.8}, {0.85, 0.95, 90, 100, 0.5}, diff --git a/internal/util/metricsinfo/quota_metric.go b/internal/util/metricsinfo/quota_metric.go index dd5a3fe70e..3a49fc910d 100644 --- a/internal/util/metricsinfo/quota_metric.go +++ b/internal/util/metricsinfo/quota_metric.go @@ -41,8 +41,9 @@ type RateMetric struct { // FlowGraphMetric contains a minimal timestamp of flow graph and the number of flow graphs. type FlowGraphMetric struct { - MinFlowGraphTt typeutil.Timestamp - NumFlowGraph int + MinFlowGraphChannel string + MinFlowGraphTt typeutil.Timestamp + NumFlowGraph int } // ReadInfoInQueue contains NQ num or task num in QueryNode's task queue.