diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 2a0aef0d66..addc51ad5d 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -449,7 +449,7 @@ quotaAndLimits: diskProtection: # When the total file size of object storage is greater than `diskQuota`, all dml requests would be rejected; enabled: true - diskQuota: -1 # GB, (0, +inf), default no limit + diskQuota: -1 # MB, (0, +inf), default no limit # limitReading decides whether dql requests are allowed. limitReading: diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 463bbe82c9..4156ced687 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -205,6 +205,7 @@ func (m *meta) GetTotalBinlogSize() int64 { for _, segment := range segments { ret += segment.getSegmentSize() } + metrics.DataCoordStoredBinlogSize.WithLabelValues().Set(float64(ret)) return ret } diff --git a/internal/metrics/datacoord_metrics.go b/internal/metrics/datacoord_metrics.go index 026f93e20a..5c46ea4bd6 100644 --- a/internal/metrics/datacoord_metrics.go +++ b/internal/metrics/datacoord_metrics.go @@ -84,6 +84,14 @@ var ( Help: "synchronized unix epoch per physical channel", }, []string{channelNameLabelName}) + DataCoordStoredBinlogSize = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.DataCoordRole, + Name: "stored_binlog_size", + Help: "binlog size of all collections/segments", + }, []string{}) + /* hard to implement, commented now DataCoordSegmentSizeRatio = prometheus.NewHistogramVec( prometheus.HistogramOpts{ @@ -139,4 +147,5 @@ func RegisterDataCoord(registry *prometheus.Registry) { registry.MustRegister(DataCoordNumStoredRows) registry.MustRegister(DataCoordNumStoredRowsCounter) registry.MustRegister(DataCoordSyncEpoch) + registry.MustRegister(DataCoordStoredBinlogSize) } diff --git a/internal/metrics/rootcoord_metrics.go b/internal/metrics/rootcoord_metrics.go index 2f64f0aa56..c8e6554068 100644 --- a/internal/metrics/rootcoord_metrics.go +++ b/internal/metrics/rootcoord_metrics.go @@ -133,6 +133,17 @@ var ( Name: "num_of_roles", Help: "The number of roles", }) + + // RootCoordTtDelay records the max time tick delay of flow graphs in DataNodes and QueryNodes. + RootCoordTtDelay = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.RootCoordRole, + Name: "time_tick_delay", + Help: "The max time tick delay of flow graphs", + }, []string{ + nodeIDLabelName, + }) ) //RegisterRootCoord registers RootCoord metrics @@ -163,4 +174,5 @@ func RegisterRootCoord(registry *prometheus.Registry) { registry.MustRegister(RootCoordNumOfCredentials) registry.MustRegister(RootCoordNumOfRoles) + registry.MustRegister(RootCoordTtDelay) } diff --git a/internal/rootcoord/quota_center.go b/internal/rootcoord/quota_center.go index 9fcaf93b40..09c5e40e18 100644 --- a/internal/rootcoord/quota_center.go +++ b/internal/rootcoord/quota_center.go @@ -19,6 +19,7 @@ package rootcoord import ( "context" "fmt" + "strconv" "sync" "time" @@ -27,6 +28,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/proxypb" "github.com/milvus-io/milvus/internal/tso" @@ -34,7 +36,6 @@ import ( "github.com/milvus-io/milvus/internal/util/metricsinfo" "github.com/milvus-io/milvus/internal/util/ratelimitutil" "github.com/milvus-io/milvus/internal/util/tsoutil" - "github.com/milvus-io/milvus/internal/util/typeutil" ) const ( @@ -86,9 +87,9 @@ type QuotaCenter struct { dataCoord types.DataCoord // metrics - queryNodeMetrics []*metricsinfo.QueryNodeQuotaMetrics - dataNodeMetrics []*metricsinfo.DataNodeQuotaMetrics - proxyMetrics []*metricsinfo.ProxyQuotaMetrics + queryNodeMetrics map[UniqueID]*metricsinfo.QueryNodeQuotaMetrics + dataNodeMetrics map[UniqueID]*metricsinfo.DataNodeQuotaMetrics + proxyMetrics map[UniqueID]*metricsinfo.ProxyQuotaMetrics dataCoordMetrics *metricsinfo.DataCoordQuotaMetrics currentRates map[internalpb.RateType]Limit @@ -152,9 +153,9 @@ func (q *QuotaCenter) stop() { // clearMetrics removes all metrics stored in QuotaCenter. func (q *QuotaCenter) clearMetrics() { - q.dataNodeMetrics = make([]*metricsinfo.DataNodeQuotaMetrics, 0) - q.queryNodeMetrics = make([]*metricsinfo.QueryNodeQuotaMetrics, 0) - q.proxyMetrics = make([]*metricsinfo.ProxyQuotaMetrics, 0) + q.dataNodeMetrics = make(map[UniqueID]*metricsinfo.DataNodeQuotaMetrics, 0) + q.queryNodeMetrics = make(map[UniqueID]*metricsinfo.QueryNodeQuotaMetrics, 0) + q.proxyMetrics = make(map[UniqueID]*metricsinfo.ProxyQuotaMetrics, 0) } // syncMetrics sends GetMetrics requests to DataCoord and QueryCoord to sync the metrics in DataNodes and QueryNodes. @@ -185,7 +186,7 @@ func (q *QuotaCenter) syncMetrics() error { } for _, queryNodeMetric := range queryCoordTopology.Cluster.ConnectedNodes { if queryNodeMetric.QuotaMetrics != nil { - q.queryNodeMetrics = append(q.queryNodeMetrics, queryNodeMetric.QuotaMetrics) + q.queryNodeMetrics[queryNodeMetric.ID] = queryNodeMetric.QuotaMetrics } } return nil @@ -206,7 +207,7 @@ func (q *QuotaCenter) syncMetrics() error { } for _, dataNodeMetric := range dataCoordTopology.Cluster.ConnectedNodes { if dataNodeMetric.QuotaMetrics != nil { - q.dataNodeMetrics = append(q.dataNodeMetrics, dataNodeMetric.QuotaMetrics) + q.dataNodeMetrics[dataNodeMetric.ID] = dataNodeMetric.QuotaMetrics } } if dataCoordTopology.Cluster.Self.QuotaMetrics != nil { @@ -228,7 +229,7 @@ func (q *QuotaCenter) syncMetrics() error { return err } if proxyMetric.QuotaMetrics != nil { - q.proxyMetrics = append(q.proxyMetrics, proxyMetric.QuotaMetrics) + q.proxyMetrics[proxyMetric.ID] = proxyMetric.QuotaMetrics } } return nil @@ -339,10 +340,11 @@ func (q *QuotaCenter) calculateWriteRates() error { } log.Debug("QuotaCenter check diskQuota done", zap.Bool("exceeded", exceeded)) - ttFactor, err := q.timeTickDelay() + ts, err := q.tsoAllocator.GenerateTSO(1) if err != nil { return err } + ttFactor := q.timeTickDelay(ts) if ttFactor <= 0 { q.forceDenyWriting(TimeTickLongDelay) // tt protection return nil @@ -409,43 +411,46 @@ func (q *QuotaCenter) resetCurrentRates() { // timeTickDelay gets time tick delay of DataNodes and QueryNodes, // and return the factor according to max tolerable time tick delay. -func (q *QuotaCenter) timeTickDelay() (float64, error) { +func (q *QuotaCenter) timeTickDelay(ts Timestamp) float64 { + t1, _ := tsoutil.ParseTS(ts) + + var maxDelay time.Duration + for nodeID, metric := range q.queryNodeMetrics { + if metric.Fgm.NumFlowGraph > 0 { + t2, _ := tsoutil.ParseTS(metric.Fgm.MinFlowGraphTt) + delay := t1.Sub(t2) + if delay.Nanoseconds() > maxDelay.Nanoseconds() { + maxDelay = delay + } + metrics.RootCoordTtDelay.WithLabelValues(strconv.FormatInt(nodeID, 10)).Set(float64(maxDelay.Milliseconds())) + } + } + for nodeID, metric := range q.dataNodeMetrics { + if metric.Fgm.NumFlowGraph > 0 { + t2, _ := tsoutil.ParseTS(metric.Fgm.MinFlowGraphTt) + delay := t1.Sub(t2) + if delay.Nanoseconds() > maxDelay.Nanoseconds() { + maxDelay = delay + } + metrics.RootCoordTtDelay.WithLabelValues(strconv.FormatInt(nodeID, 10)).Set(float64(maxDelay.Milliseconds())) + } + } + if !Params.QuotaConfig.TtProtectionEnabled { - return 1, nil + return 1 } maxTt := Params.QuotaConfig.MaxTimeTickDelay if maxTt < 0 { // < 0 means disable tt protection - return 1, nil + return 1 } - minTs := typeutil.MaxTimestamp - for _, metric := range q.queryNodeMetrics { - if metric.Fgm.NumFlowGraph > 0 && metric.Fgm.MinFlowGraphTt < minTs { - minTs = metric.Fgm.MinFlowGraphTt - } + log.Debug("QuotaCenter check timeTick delay", zap.Time("curTs", t1), zap.Duration("maxDelay", maxDelay)) + if maxDelay.Nanoseconds() >= maxTt.Nanoseconds() { + return 0 } - for _, metric := range q.dataNodeMetrics { - if metric.Fgm.NumFlowGraph > 0 && metric.Fgm.MinFlowGraphTt < minTs { - minTs = metric.Fgm.MinFlowGraphTt - } - } - ts, err := q.tsoAllocator.GenerateTSO(1) - if err != nil { - return 0, err - } - if minTs >= ts { - return 1, nil - } - t1, _ := tsoutil.ParseTS(minTs) - t2, _ := tsoutil.ParseTS(ts) - delay := t2.Sub(t1) - log.Debug("QuotaCenter check timeTick delay", zap.Time("minTs", t1), zap.Time("curTs", t2), zap.Duration("delay", delay)) - if delay.Nanoseconds() >= maxTt.Nanoseconds() { - return 0, nil - } - return float64(maxTt.Nanoseconds()-delay.Nanoseconds()) / float64(maxTt.Nanoseconds()), nil + return float64(maxTt.Nanoseconds()-maxDelay.Nanoseconds()) / float64(maxTt.Nanoseconds()) } // checkNQInQuery checks search&query nq in QueryNode, diff --git a/internal/rootcoord/quota_center_test.go b/internal/rootcoord/quota_center_test.go index f642699d67..c9ec1e37df 100644 --- a/internal/rootcoord/quota_center_test.go +++ b/internal/rootcoord/quota_center_test.go @@ -138,8 +138,7 @@ func TestQuotaCenter(t *testing.T) { t.Run("test timeTickDelay", func(t *testing.T) { // test MaxTimestamp quotaCenter := NewQuotaCenter(pcm, &queryCoordMockForQuota{}, &dataCoordMockForQuota{}, core.tsoAllocator) - factor, err := quotaCenter.timeTickDelay() - assert.NoError(t, err) + factor := quotaCenter.timeTickDelay(0) assert.Equal(t, float64(1), factor) now := time.Now() @@ -155,14 +154,14 @@ func TestQuotaCenter(t *testing.T) { return ts, nil } quotaCenter.tsoAllocator = alloc - quotaCenter.queryNodeMetrics = []*metricsinfo.QueryNodeQuotaMetrics{{ - Fgm: metricsinfo.FlowGraphMetric{ + quotaCenter.queryNodeMetrics = map[UniqueID]*metricsinfo.QueryNodeQuotaMetrics{ + 1: {Fgm: metricsinfo.FlowGraphMetric{ MinFlowGraphTt: tsoutil.ComposeTSByTime(now, 0), NumFlowGraph: 1, - }, - }} - factor, err = quotaCenter.timeTickDelay() + }}} + ts, err := quotaCenter.tsoAllocator.GenerateTSO(1) assert.NoError(t, err) + factor = quotaCenter.timeTickDelay(ts) assert.Equal(t, float64(0), factor) // test one-third time tick delay @@ -172,23 +171,54 @@ func TestQuotaCenter(t *testing.T) { oneThirdTs := tsoutil.ComposeTSByTime(added, 0) return oneThirdTs, nil } - quotaCenter.queryNodeMetrics = []*metricsinfo.QueryNodeQuotaMetrics{{ - Fgm: metricsinfo.FlowGraphMetric{ + quotaCenter.queryNodeMetrics = map[UniqueID]*metricsinfo.QueryNodeQuotaMetrics{ + 1: {Fgm: metricsinfo.FlowGraphMetric{ MinFlowGraphTt: tsoutil.ComposeTSByTime(now, 0), NumFlowGraph: 1, - }, - }} - factor, err = quotaCenter.timeTickDelay() + }}} + ts, err = quotaCenter.tsoAllocator.GenerateTSO(1) assert.NoError(t, err) + factor = quotaCenter.timeTickDelay(ts) ok := math.Abs(factor-2.0/3.0) < 0.0001 assert.True(t, ok) + }) - // test with error - alloc.GenerateTSOF = func(count uint32) (typeutil.Timestamp, error) { - return 0, fmt.Errorf("mock err") + t.Run("test timeTickDelay factors", func(t *testing.T) { + quotaCenter := NewQuotaCenter(pcm, &queryCoordMockForQuota{}, &dataCoordMockForQuota{}, core.tsoAllocator) + type ttCase struct { + maxTtDelay time.Duration + curTt time.Time + fgTt time.Time + expectedFactor float64 } - _, err = quotaCenter.timeTickDelay() - assert.Error(t, err) + t0 := time.Now() + ttCases := []ttCase{ + {10 * time.Second, t0, t0.Add(1 * time.Second), 1}, + {10 * time.Second, t0, t0, 1}, + {10 * time.Second, t0.Add(1 * time.Second), t0, 0.9}, + {10 * time.Second, t0.Add(2 * time.Second), t0, 0.8}, + {10 * time.Second, t0.Add(5 * time.Second), t0, 0.5}, + {10 * time.Second, t0.Add(7 * time.Second), t0, 0.3}, + {10 * time.Second, t0.Add(9 * time.Second), t0, 0.1}, + {10 * time.Second, t0.Add(10 * time.Second), t0, 0}, + {10 * time.Second, t0.Add(100 * time.Second), t0, 0}, + } + + backup := Params.QuotaConfig.MaxTimeTickDelay + + for i, c := range ttCases { + Params.QuotaConfig.MaxTimeTickDelay = c.maxTtDelay + fgTs := tsoutil.ComposeTSByTime(c.fgTt, 0) + quotaCenter.queryNodeMetrics = map[UniqueID]*metricsinfo.QueryNodeQuotaMetrics{1: {Fgm: metricsinfo.FlowGraphMetric{NumFlowGraph: 1, MinFlowGraphTt: fgTs}}} + curTs := tsoutil.ComposeTSByTime(c.curTt, 0) + factor := quotaCenter.timeTickDelay(curTs) + if math.Abs(factor-c.expectedFactor) > 0.000001 { + t.Errorf("case %d failed: curTs[%v], fgTs[%v], expectedFactor: %f, actualFactor: %f", + i, c.curTt, c.fgTt, c.expectedFactor, factor) + } + } + + Params.QuotaConfig.MaxTimeTickDelay = backup }) t.Run("test checkNQInQuery", func(t *testing.T) { @@ -199,20 +229,18 @@ func TestQuotaCenter(t *testing.T) { // test cool off Params.QuotaConfig.QueueProtectionEnabled = true Params.QuotaConfig.NQInQueueThreshold = 100 - quotaCenter.queryNodeMetrics = []*metricsinfo.QueryNodeQuotaMetrics{{ - SearchQueue: metricsinfo.ReadInfoInQueue{ + quotaCenter.queryNodeMetrics = map[UniqueID]*metricsinfo.QueryNodeQuotaMetrics{ + 1: {SearchQueue: metricsinfo.ReadInfoInQueue{ UnsolvedQueue: Params.QuotaConfig.NQInQueueThreshold, - }, - }} + }}} factor = quotaCenter.checkNQInQuery() assert.Equal(t, Params.QuotaConfig.CoolOffSpeed, factor) // test no cool off - quotaCenter.queryNodeMetrics = []*metricsinfo.QueryNodeQuotaMetrics{{ - SearchQueue: metricsinfo.ReadInfoInQueue{ + quotaCenter.queryNodeMetrics = map[UniqueID]*metricsinfo.QueryNodeQuotaMetrics{ + 1: {SearchQueue: metricsinfo.ReadInfoInQueue{ UnsolvedQueue: Params.QuotaConfig.NQInQueueThreshold - 1, - }, - }} + }}} factor = quotaCenter.checkNQInQuery() assert.Equal(t, 1.0, factor) //ok := math.Abs(factor-1.0) < 0.0001 @@ -228,20 +256,18 @@ func TestQuotaCenter(t *testing.T) { Params.QuotaConfig.QueueProtectionEnabled = true Params.QuotaConfig.QueueLatencyThreshold = float64(3 * time.Second) - quotaCenter.queryNodeMetrics = []*metricsinfo.QueryNodeQuotaMetrics{{ - SearchQueue: metricsinfo.ReadInfoInQueue{ + quotaCenter.queryNodeMetrics = map[UniqueID]*metricsinfo.QueryNodeQuotaMetrics{ + 1: {SearchQueue: metricsinfo.ReadInfoInQueue{ AvgQueueDuration: time.Duration(Params.QuotaConfig.QueueLatencyThreshold), - }, - }} + }}} factor = quotaCenter.checkQueryLatency() assert.Equal(t, Params.QuotaConfig.CoolOffSpeed, factor) // test no cool off - quotaCenter.queryNodeMetrics = []*metricsinfo.QueryNodeQuotaMetrics{{ - SearchQueue: metricsinfo.ReadInfoInQueue{ + quotaCenter.queryNodeMetrics = map[UniqueID]*metricsinfo.QueryNodeQuotaMetrics{ + 1: {SearchQueue: metricsinfo.ReadInfoInQueue{ AvgQueueDuration: 1 * time.Second, - }, - }} + }}} factor = quotaCenter.checkQueryLatency() assert.Equal(t, 1.0, factor) }) @@ -255,65 +281,59 @@ func TestQuotaCenter(t *testing.T) { Params.QuotaConfig.ResultProtectionEnabled = true Params.QuotaConfig.MaxReadResultRate = 1 - quotaCenter.proxyMetrics = []*metricsinfo.ProxyQuotaMetrics{{ - Rms: []metricsinfo.RateMetric{ + quotaCenter.proxyMetrics = map[UniqueID]*metricsinfo.ProxyQuotaMetrics{ + 1: {Rms: []metricsinfo.RateMetric{ {Label: metricsinfo.ReadResultThroughput, Rate: 1.2}, - }, - }} + }}} factor = quotaCenter.checkReadResultRate() assert.Equal(t, Params.QuotaConfig.CoolOffSpeed, factor) // test no cool off - quotaCenter.proxyMetrics = []*metricsinfo.ProxyQuotaMetrics{{ - Rms: []metricsinfo.RateMetric{ + quotaCenter.proxyMetrics = map[UniqueID]*metricsinfo.ProxyQuotaMetrics{ + 1: {Rms: []metricsinfo.RateMetric{ {Label: metricsinfo.ReadResultThroughput, Rate: 0.8}, - }, - }} + }}} factor = quotaCenter.checkReadResultRate() assert.Equal(t, 1.0, factor) }) t.Run("test calculateReadRates", func(t *testing.T) { quotaCenter := NewQuotaCenter(pcm, &queryCoordMockForQuota{}, &dataCoordMockForQuota{}, core.tsoAllocator) - quotaCenter.proxyMetrics = []*metricsinfo.ProxyQuotaMetrics{{ - Rms: []metricsinfo.RateMetric{ + quotaCenter.proxyMetrics = map[UniqueID]*metricsinfo.ProxyQuotaMetrics{ + 1: {Rms: []metricsinfo.RateMetric{ {Label: internalpb.RateType_DQLSearch.String(), Rate: 100}, {Label: internalpb.RateType_DQLQuery.String(), Rate: 100}, - }, - }} + }}} Params.QuotaConfig.ForceDenyReading = false Params.QuotaConfig.QueueProtectionEnabled = true Params.QuotaConfig.QueueLatencyThreshold = 100 - quotaCenter.queryNodeMetrics = []*metricsinfo.QueryNodeQuotaMetrics{{ - SearchQueue: metricsinfo.ReadInfoInQueue{ + quotaCenter.queryNodeMetrics = map[UniqueID]*metricsinfo.QueryNodeQuotaMetrics{ + 1: {SearchQueue: metricsinfo.ReadInfoInQueue{ AvgQueueDuration: time.Duration(Params.QuotaConfig.QueueLatencyThreshold), - }, - }} + }}} quotaCenter.calculateReadRates() assert.Equal(t, Limit(100.0*0.9), quotaCenter.currentRates[internalpb.RateType_DQLSearch]) assert.Equal(t, Limit(100.0*0.9), quotaCenter.currentRates[internalpb.RateType_DQLQuery]) Params.QuotaConfig.NQInQueueThreshold = 100 - quotaCenter.queryNodeMetrics = []*metricsinfo.QueryNodeQuotaMetrics{{ - SearchQueue: metricsinfo.ReadInfoInQueue{ + quotaCenter.queryNodeMetrics = map[UniqueID]*metricsinfo.QueryNodeQuotaMetrics{ + 1: {SearchQueue: metricsinfo.ReadInfoInQueue{ UnsolvedQueue: Params.QuotaConfig.NQInQueueThreshold, - }, - }} + }}} quotaCenter.calculateReadRates() assert.Equal(t, Limit(100.0*0.9), quotaCenter.currentRates[internalpb.RateType_DQLSearch]) assert.Equal(t, Limit(100.0*0.9), quotaCenter.currentRates[internalpb.RateType_DQLQuery]) Params.QuotaConfig.ResultProtectionEnabled = true Params.QuotaConfig.MaxReadResultRate = 1 - quotaCenter.proxyMetrics = []*metricsinfo.ProxyQuotaMetrics{{ - Rms: []metricsinfo.RateMetric{ + quotaCenter.proxyMetrics = map[UniqueID]*metricsinfo.ProxyQuotaMetrics{ + 1: {Rms: []metricsinfo.RateMetric{ {Label: internalpb.RateType_DQLSearch.String(), Rate: 100}, {Label: internalpb.RateType_DQLQuery.String(), Rate: 100}, {Label: metricsinfo.ReadResultThroughput, Rate: 1.2}, - }, - }} - quotaCenter.queryNodeMetrics = []*metricsinfo.QueryNodeQuotaMetrics{{SearchQueue: metricsinfo.ReadInfoInQueue{}}} + }}} + quotaCenter.queryNodeMetrics = map[UniqueID]*metricsinfo.QueryNodeQuotaMetrics{1: {SearchQueue: metricsinfo.ReadInfoInQueue{}}} quotaCenter.calculateReadRates() assert.Equal(t, Limit(100.0*0.9), quotaCenter.currentRates[internalpb.RateType_DQLSearch]) assert.Equal(t, Limit(100.0*0.9), quotaCenter.currentRates[internalpb.RateType_DQLQuery]) @@ -348,10 +368,10 @@ func TestQuotaCenter(t *testing.T) { quotaCenter := NewQuotaCenter(pcm, &queryCoordMockForQuota{}, &dataCoordMockForQuota{}, core.tsoAllocator) factor := quotaCenter.memoryToWaterLevel() assert.Equal(t, float64(1), factor) - quotaCenter.dataNodeMetrics = []*metricsinfo.DataNodeQuotaMetrics{{Hms: metricsinfo.HardwareMetrics{MemoryUsage: 100, Memory: 100}}} + quotaCenter.dataNodeMetrics = map[UniqueID]*metricsinfo.DataNodeQuotaMetrics{1: {Hms: metricsinfo.HardwareMetrics{MemoryUsage: 100, Memory: 100}}} factor = quotaCenter.memoryToWaterLevel() assert.Equal(t, float64(0), factor) - quotaCenter.queryNodeMetrics = []*metricsinfo.QueryNodeQuotaMetrics{{Hms: metricsinfo.HardwareMetrics{MemoryUsage: 100, Memory: 100}}} + quotaCenter.queryNodeMetrics = map[UniqueID]*metricsinfo.QueryNodeQuotaMetrics{1: {Hms: metricsinfo.HardwareMetrics{MemoryUsage: 100, Memory: 100}}} factor = quotaCenter.memoryToWaterLevel() assert.Equal(t, float64(0), factor) }) @@ -385,8 +405,7 @@ func TestQuotaCenter(t *testing.T) { for i, c := range memCases { Params.QuotaConfig.QueryNodeMemoryLowWaterLevel = c.lowWater Params.QuotaConfig.QueryNodeMemoryHighWaterLevel = c.highWater - quotaCenter.queryNodeMetrics = []*metricsinfo.QueryNodeQuotaMetrics{{ - Hms: metricsinfo.HardwareMetrics{MemoryUsage: c.memUsage, Memory: c.memTotal}}} + quotaCenter.queryNodeMetrics = map[UniqueID]*metricsinfo.QueryNodeQuotaMetrics{1: {Hms: metricsinfo.HardwareMetrics{MemoryUsage: c.memUsage, Memory: c.memTotal}}} factor := quotaCenter.memoryToWaterLevel() if math.Abs(factor-c.expectedFactor) > 0.000001 { t.Errorf("case %d failed: waterLever[low:%v, high:%v], memMetric[used:%d, total:%d], expectedFactor: %f, actualFactor: %f", diff --git a/internal/util/paramtable/quota_param.go b/internal/util/paramtable/quota_param.go index bc1ff531b3..a1934ff33d 100644 --- a/internal/util/paramtable/quota_param.go +++ b/internal/util/paramtable/quota_param.go @@ -523,10 +523,13 @@ func (p *quotaConfig) initDiskQuota() { p.DiskQuota = p.Base.ParseFloatWithDefault("quotaAndLimits.limitWriting.diskProtection.diskQuota", defaultDiskQuotaInMB) // (0, +inf) if p.DiskQuota <= 0 { - log.Warn("DiskQuota must in the range of `(0, +inf)`, use default +inf", zap.Float64("DiskQuota", p.DiskQuota)) p.DiskQuota = defaultDiskQuotaInMB } - log.Debug("init disk quota", zap.Float64("diskQuota(MB)", p.DiskQuota)) + if p.DiskQuota < defaultDiskQuotaInMB { + log.Debug("init disk quota", zap.Float64("diskQuota(MB)", p.DiskQuota)) + } else { + log.Debug("init disk quota", zap.String("diskQuota(MB)", "+inf")) + } // megabytes to bytes p.DiskQuota = megaBytes2Bytes(p.DiskQuota) }