diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 029fea7b34..693da2dadb 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -501,6 +501,13 @@ quotaAndLimits: dataNodeMemoryHighWaterLevel: 0.95 # (0, 1], memoryHighWaterLevel in DataNodes queryNodeMemoryLowWaterLevel: 0.85 # (0, 1], memoryLowWaterLevel in QueryNodes queryNodeMemoryHighWaterLevel: 0.95 # (0, 1], memoryHighWaterLevel in QueryNodes + growingSegmentsSizeProtection: + # 1. No action will be taken if the ratio of growing segments size is less than the low water level. + # 2. The DML rate will be reduced if the ratio of growing segments size is greater than the low water level and less than the high water level. + # 3. All DML requests will be rejected if the ratio of growing segments size is greater than the high water level. + enabled: false + lowWaterLevel: 0.2 + highWaterLevel: 0.4 diskProtection: enabled: true # When the total file size of object storage is greater than `diskQuota`, all dml requests would be rejected; diskQuota: -1 # MB, (0, +inf), default no limit diff --git a/internal/querynodev2/metrics_info.go b/internal/querynodev2/metrics_info.go index 6771f1a349..71f1c97316 100644 --- a/internal/querynodev2/metrics_info.go +++ b/internal/querynodev2/metrics_info.go @@ -20,9 +20,12 @@ import ( "context" "time" + "github.com/samber/lo" + "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus-proto/go-api/milvuspb" "github.com/milvus-io/milvus/internal/querynodev2/collector" + "github.com/milvus-io/milvus/internal/querynodev2/segments" "github.com/milvus-io/milvus/pkg/util/hardware" "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -104,6 +107,18 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error } minTsafeChannel, minTsafe := node.tSafeManager.Min() + + growingSegments := node.manager.Segment.GetBy(segments.WithType(segments.SegmentTypeGrowing)) + growingSegmentsSize := lo.SumBy(growingSegments, func(seg segments.Segment) int64 { + return seg.MemSize() + }) + + allSegments := node.manager.Segment.GetBy() + collections := typeutil.NewUniqueSet() + for _, segment := range allSegments { + collections.Insert(segment.Collection()) + } + return &metricsinfo.QueryNodeQuotaMetrics{ Hms: metricsinfo.HardwareMetrics{}, Rms: rms, @@ -112,8 +127,13 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error MinFlowGraphTt: minTsafe, NumFlowGraph: node.pipelineManager.Num(), }, - SearchQueue: sqms, - QueryQueue: qqms, + SearchQueue: sqms, + QueryQueue: qqms, + GrowingSegmentsSize: growingSegmentsSize, + Effect: metricsinfo.NodeEffect{ + NodeID: paramtable.GetNodeID(), + CollectionIDs: collections.Collect(), + }, }, nil } diff --git a/internal/rootcoord/quota_center.go b/internal/rootcoord/quota_center.go index dd945ed4d8..92743a01e9 100644 --- a/internal/rootcoord/quota_center.go +++ b/internal/rootcoord/quota_center.go @@ -80,6 +80,7 @@ type collectionStates = map[milvuspb.QuotaState]commonpb.ErrorCode // 4. DQL Queue length protection -> dqlRate = curDQLRate * CoolOffSpeed // 5. DQL queue latency protection -> dqlRate = curDQLRate * CoolOffSpeed // 6. Search result protection -> searchRate = curSearchRate * CoolOffSpeed +// 7. GrowingSegsSize protection -> dmlRate = maxDMLRate * (high - cur) / (high - low) // // If necessary, user can also manually force to deny RW requests. type QuotaCenter struct { @@ -422,14 +423,23 @@ func (q *QuotaCenter) calculateWriteRates() error { if err != nil { return err } - collectionFactors := q.getTimeTickDelayFactor(ts) - memFactors := q.getMemoryFactor() - for collection, factor := range memFactors { - _, ok := collectionFactors[collection] - if !ok || collectionFactors[collection] > factor { - collectionFactors[collection] = factor + + var collectionFactors map[int64]float64 + updateCollectionFactor := func(factors map[int64]float64) { + for collection, factor := range factors { + _, ok := collectionFactors[collection] + if !ok || collectionFactors[collection] > factor { + collectionFactors[collection] = factor + } } } + + collectionFactors = q.getTimeTickDelayFactor(ts) + memFactors := q.getMemoryFactor() + updateCollectionFactor(memFactors) + growingSegFactors := q.getGrowingSegmentsSizeFactor() + updateCollectionFactor(growingSegFactors) + for collection, factor := range collectionFactors { if factor <= 0 { q.forceDenyWriting(commonpb.ErrorCode_TimeTickLongDelay, collection) @@ -598,6 +608,51 @@ func (q *QuotaCenter) getMemoryFactor() map[int64]float64 { return collectionFactor } +func (q *QuotaCenter) getGrowingSegmentsSizeFactor() map[int64]float64 { + log := log.Ctx(context.Background()).WithRateGroup("rootcoord.QuotaCenter", 1.0, 60.0) + if !Params.QuotaConfig.GrowingSegmentsSizeProtectionEnabled.GetAsBool() { + return make(map[int64]float64) + } + + low := Params.QuotaConfig.GrowingSegmentsSizeLowWaterLevel.GetAsFloat() + high := Params.QuotaConfig.GrowingSegmentsSizeHighWaterLevel.GetAsFloat() + + collectionFactor := make(map[int64]float64) + updateCollectionFactor := func(factor float64, collections []int64) { + for _, collection := range collections { + _, ok := collectionFactor[collection] + if !ok || collectionFactor[collection] > factor { + collectionFactor[collection] = factor + } + } + } + for nodeID, metric := range q.queryNodeMetrics { + cur := float64(metric.GrowingSegmentsSize) / float64(metric.Hms.Memory) + if cur <= low { + continue + } + if cur >= high { + log.RatedWarn(10, "QuotaCenter: QueryNode growing segments size to high water level", + zap.String("Node", fmt.Sprintf("%s-%d", typeutil.QueryNodeRole, nodeID)), + zap.Int64s("collections", metric.Effect.CollectionIDs), + zap.Int64("segmentsSize", metric.GrowingSegmentsSize), + zap.Uint64("TotalMem", metric.Hms.Memory), + zap.Float64("highWaterLevel", high)) + updateCollectionFactor(0, metric.Effect.CollectionIDs) + continue + } + factor := (high - cur) / (high - low) + updateCollectionFactor(factor, metric.Effect.CollectionIDs) + log.RatedWarn(10, "QuotaCenter: QueryNode growing segments size to low water level, limit writing rate", + zap.String("Node", fmt.Sprintf("%s-%d", typeutil.QueryNodeRole, nodeID)), + zap.Int64s("collections", metric.Effect.CollectionIDs), + zap.Int64("segmentsSize", metric.GrowingSegmentsSize), + zap.Uint64("TotalMem", metric.Hms.Memory), + zap.Float64("lowWaterLevel", low)) + } + return collectionFactor +} + // calculateRates calculates target rates by different strategies. func (q *QuotaCenter) calculateRates() error { q.resetAllCurrentRates() diff --git a/internal/rootcoord/quota_center_test.go b/internal/rootcoord/quota_center_test.go index 2f49b812d9..dedcb435f9 100644 --- a/internal/rootcoord/quota_center_test.go +++ b/internal/rootcoord/quota_center_test.go @@ -397,6 +397,58 @@ func TestQuotaCenter(t *testing.T) { paramtable.Get().Reset(Params.QuotaConfig.QueryNodeMemoryHighWaterLevel.Key) }) + t.Run("test GrowingSegmentsSize factors", func(t *testing.T) { + qc := types.NewMockQueryCoord(t) + quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator) + tests := []struct { + low float64 + high float64 + growingSize int64 + memTotal uint64 + expectedFactor float64 + }{ + {0.8, 0.9, 10, 100, 1}, + {0.8, 0.9, 80, 100, 1}, + {0.8, 0.9, 82, 100, 0.8}, + {0.8, 0.9, 85, 100, 0.5}, + {0.8, 0.9, 88, 100, 0.2}, + {0.8, 0.9, 90, 100, 0}, + + {0.85, 0.95, 25, 100, 1}, + {0.85, 0.95, 85, 100, 1}, + {0.85, 0.95, 87, 100, 0.8}, + {0.85, 0.95, 90, 100, 0.5}, + {0.85, 0.95, 93, 100, 0.2}, + {0.85, 0.95, 95, 100, 0}, + } + + quotaCenter.writableCollections = append(quotaCenter.writableCollections, 1, 2, 3) + paramtable.Get().Save(Params.QuotaConfig.GrowingSegmentsSizeProtectionEnabled.Key, "true") + for _, test := range tests { + paramtable.Get().Save(Params.QuotaConfig.GrowingSegmentsSizeLowWaterLevel.Key, fmt.Sprintf("%f", test.low)) + paramtable.Get().Save(Params.QuotaConfig.GrowingSegmentsSizeHighWaterLevel.Key, fmt.Sprintf("%f", test.high)) + quotaCenter.queryNodeMetrics = map[UniqueID]*metricsinfo.QueryNodeQuotaMetrics{ + 1: { + Hms: metricsinfo.HardwareMetrics{ + Memory: test.memTotal, + }, + Effect: metricsinfo.NodeEffect{ + NodeID: 1, + CollectionIDs: []int64{1, 2, 3}, + }, + GrowingSegmentsSize: test.growingSize, + }, + } + factors := quotaCenter.getGrowingSegmentsSizeFactor() + + for _, factor := range factors { + assert.True(t, math.Abs(factor-test.expectedFactor) < 0.01) + } + } + paramtable.Get().Reset(Params.QuotaConfig.GrowingSegmentsSizeLowWaterLevel.Key) + paramtable.Get().Reset(Params.QuotaConfig.GrowingSegmentsSizeHighWaterLevel.Key) + }) + t.Run("test checkDiskQuota", func(t *testing.T) { qc := types.NewMockQueryCoord(t) quotaCenter := NewQuotaCenter(pcm, qc, &dataCoordMockForQuota{}, core.tsoAllocator) diff --git a/pkg/util/metricsinfo/quota_metric.go b/pkg/util/metricsinfo/quota_metric.go index 48ba8fdc33..84ffddf348 100644 --- a/pkg/util/metricsinfo/quota_metric.go +++ b/pkg/util/metricsinfo/quota_metric.go @@ -75,12 +75,13 @@ type NodeEffect struct { // QueryNodeQuotaMetrics are metrics of QueryNode. type QueryNodeQuotaMetrics struct { - Hms HardwareMetrics - Rms []RateMetric - Fgm FlowGraphMetric - SearchQueue ReadInfoInQueue - QueryQueue ReadInfoInQueue - Effect NodeEffect + Hms HardwareMetrics + Rms []RateMetric + Fgm FlowGraphMetric + SearchQueue ReadInfoInQueue + QueryQueue ReadInfoInQueue + GrowingSegmentsSize int64 + Effect NodeEffect } type DataCoordQuotaMetrics struct { diff --git a/pkg/util/paramtable/quota_param.go b/pkg/util/paramtable/quota_param.go index ae332055aa..923a91c201 100644 --- a/pkg/util/paramtable/quota_param.go +++ b/pkg/util/paramtable/quota_param.go @@ -91,17 +91,20 @@ type quotaConfig struct { MaxCollectionNumPerDB ParamItem `refreshable:"true"` // limit writing - ForceDenyWriting ParamItem `refreshable:"true"` - TtProtectionEnabled ParamItem `refreshable:"true"` - MaxTimeTickDelay ParamItem `refreshable:"true"` - MemProtectionEnabled ParamItem `refreshable:"true"` - DataNodeMemoryLowWaterLevel ParamItem `refreshable:"true"` - DataNodeMemoryHighWaterLevel ParamItem `refreshable:"true"` - QueryNodeMemoryLowWaterLevel ParamItem `refreshable:"true"` - QueryNodeMemoryHighWaterLevel ParamItem `refreshable:"true"` - DiskProtectionEnabled ParamItem `refreshable:"true"` - DiskQuota ParamItem `refreshable:"true"` - DiskQuotaPerCollection ParamItem `refreshable:"true"` + ForceDenyWriting ParamItem `refreshable:"true"` + TtProtectionEnabled ParamItem `refreshable:"true"` + MaxTimeTickDelay ParamItem `refreshable:"true"` + MemProtectionEnabled ParamItem `refreshable:"true"` + DataNodeMemoryLowWaterLevel ParamItem `refreshable:"true"` + DataNodeMemoryHighWaterLevel ParamItem `refreshable:"true"` + QueryNodeMemoryLowWaterLevel ParamItem `refreshable:"true"` + QueryNodeMemoryHighWaterLevel ParamItem `refreshable:"true"` + GrowingSegmentsSizeProtectionEnabled ParamItem `refreshable:"true"` + GrowingSegmentsSizeLowWaterLevel ParamItem `refreshable:"true"` + GrowingSegmentsSizeHighWaterLevel ParamItem `refreshable:"true"` + DiskProtectionEnabled ParamItem `refreshable:"true"` + DiskQuota ParamItem `refreshable:"true"` + DiskQuotaPerCollection ParamItem `refreshable:"true"` // limit reading ForceDenyReading ParamItem `refreshable:"true"` @@ -880,6 +883,52 @@ When memory usage < memoryLowWaterLevel, no action.`, } p.QueryNodeMemoryHighWaterLevel.Init(base.mgr) + p.GrowingSegmentsSizeProtectionEnabled = ParamItem{ + Key: "quotaAndLimits.limitWriting.growingSegmentsSizeProtection.enabled", + Version: "2.2.9", + DefaultValue: "false", + Doc: `1. No action will be taken if the ratio of growing segments size is less than the low water level. +2. The DML rate will be reduced if the ratio of growing segments size is greater than the low water level and less than the high water level. +3. All DML requests will be rejected if the ratio of growing segments size is greater than the high water level.`, + Export: true, + } + p.GrowingSegmentsSizeProtectionEnabled.Init(base.mgr) + + defaultGrowingSegSizeLowWaterLevel := "0.2" + p.GrowingSegmentsSizeLowWaterLevel = ParamItem{ + Key: "quotaAndLimits.limitWriting.growingSegmentsSizeProtection.lowWaterLevel", + Version: "2.2.9", + DefaultValue: defaultGrowingSegSizeLowWaterLevel, + Formatter: func(v string) string { + level := getAsFloat(v) + if level <= 0 || level > 1 { + return defaultGrowingSegSizeLowWaterLevel + } + return v + }, + Export: true, + } + p.GrowingSegmentsSizeLowWaterLevel.Init(base.mgr) + + defaultGrowingSegSizeHighWaterLevel := "0.4" + p.GrowingSegmentsSizeHighWaterLevel = ParamItem{ + Key: "quotaAndLimits.limitWriting.growingSegmentsSizeProtection.highWaterLevel", + Version: "2.2.9", + DefaultValue: defaultGrowingSegSizeHighWaterLevel, + Formatter: func(v string) string { + level := getAsFloat(v) + if level <= 0 || level > 1 { + return defaultGrowingSegSizeHighWaterLevel + } + if !p.checkMinMaxLegal(p.GrowingSegmentsSizeLowWaterLevel.GetAsFloat(), getAsFloat(v)) { + return defaultGrowingSegSizeHighWaterLevel + } + return v + }, + Export: true, + } + p.GrowingSegmentsSizeHighWaterLevel.Init(base.mgr) + p.DiskProtectionEnabled = ParamItem{ Key: "quotaAndLimits.limitWriting.diskProtection.enabled", Version: "2.2.0", diff --git a/pkg/util/paramtable/quota_param_test.go b/pkg/util/paramtable/quota_param_test.go index 62a179d514..739765d6f8 100644 --- a/pkg/util/paramtable/quota_param_test.go +++ b/pkg/util/paramtable/quota_param_test.go @@ -122,6 +122,9 @@ func TestQuotaParam(t *testing.T) { assert.Equal(t, defaultHighWaterLevel, qc.DataNodeMemoryHighWaterLevel.GetAsFloat()) assert.Equal(t, defaultLowWaterLevel, qc.QueryNodeMemoryLowWaterLevel.GetAsFloat()) assert.Equal(t, defaultHighWaterLevel, qc.QueryNodeMemoryHighWaterLevel.GetAsFloat()) + assert.Equal(t, false, qc.GrowingSegmentsSizeProtectionEnabled.GetAsBool()) + assert.Equal(t, 0.2, qc.GrowingSegmentsSizeLowWaterLevel.GetAsFloat()) + assert.Equal(t, 0.4, qc.GrowingSegmentsSizeHighWaterLevel.GetAsFloat()) assert.Equal(t, true, qc.DiskProtectionEnabled.GetAsBool()) assert.Equal(t, defaultMax, qc.DiskQuota.GetAsFloat()) assert.Equal(t, defaultMax, qc.DiskQuotaPerCollection.GetAsFloat())