diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 3b5658a7c7..726efccdeb 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -679,6 +679,7 @@ dataCoord: dropTolerance: 10800 # The retention duration of the binlog files of the deleted segments before they are cleared, unit: second. removeConcurrent: 32 # number of concurrent goroutines to remove dropped s3 objects scanInterval: 168 # orphan file (file on oss but has not been registered on meta) on object storage garbage collection scanning interval in hours + slowDownCPUUsageThreshold: 0.6 # The CPU usage threshold at which the garbage collection will be slowed down enableActiveStandby: false brokerTimeout: 5000 # 5000ms, dataCoord broker rpc timeout autoBalance: true # Enable auto balance diff --git a/internal/datacoord/garbage_collector.go b/internal/datacoord/garbage_collector.go index 2909653a7d..98c2df3fbe 100644 --- a/internal/datacoord/garbage_collector.go +++ b/internal/datacoord/garbage_collector.go @@ -39,6 +39,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/util/conc" "github.com/milvus-io/milvus/pkg/v2/util/funcutil" + "github.com/milvus-io/milvus/pkg/v2/util/hardware" "github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/metautil" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" @@ -73,13 +74,42 @@ type garbageCollector struct { wg sync.WaitGroup cmdCh chan gcCmd pauseUntil atomic.Time + + systemMetricsListener *hardware.SystemMetricsListener } + type gcCmd struct { cmdType datapb.GcCommand duration time.Duration done chan struct{} } +// newSystemMetricsListener creates a system metrics listener for garbage collector. +// used to slow down the garbage collector when cpu usage is high. +func newSystemMetricsListener(opt *GcOption) *hardware.SystemMetricsListener { + return &hardware.SystemMetricsListener{ + Cooldown: 15 * time.Second, + Context: false, + Condition: func(metrics hardware.SystemMetrics, listener *hardware.SystemMetricsListener) bool { return true }, + Callback: func(metrics hardware.SystemMetrics, listener *hardware.SystemMetricsListener) { + isSlowDown := listener.Context.(bool) + if metrics.UsedRatio() > paramtable.Get().DataCoordCfg.GCSlowDownCPUUsageThreshold.GetAsFloat() { + if !isSlowDown { + log.Info("garbage collector slow down...", zap.Float64("cpuUsage", metrics.UsedRatio())) + opt.removeObjectPool.Resize(1) + listener.Context = true + } + return + } + if isSlowDown { + log.Info("garbage collector slow down finished", zap.Float64("cpuUsage", metrics.UsedRatio())) + opt.removeObjectPool.Resize(paramtable.Get().DataCoordCfg.GCRemoveConcurrent.GetAsInt()) + listener.Context = false + } + }, + } +} + // newGarbageCollector create garbage collector with meta and option func newGarbageCollector(meta *meta, handler Handler, opt GcOption) *garbageCollector { log.Info("GC with option", @@ -91,12 +121,13 @@ func newGarbageCollector(meta *meta, handler Handler, opt GcOption) *garbageColl opt.removeObjectPool = conc.NewPool[struct{}](Params.DataCoordCfg.GCRemoveConcurrent.GetAsInt(), conc.WithExpiryDuration(time.Minute)) ctx, cancel := context.WithCancel(context.Background()) return &garbageCollector{ - ctx: ctx, - cancel: cancel, - meta: meta, - handler: handler, - option: opt, - cmdCh: make(chan gcCmd), + ctx: ctx, + cancel: cancel, + meta: meta, + handler: handler, + option: opt, + cmdCh: make(chan gcCmd), + systemMetricsListener: newSystemMetricsListener(&opt), } } @@ -182,6 +213,9 @@ func (gc *garbageCollector) work(ctx context.Context) { // startControlLoop start a control loop for garbageCollector. func (gc *garbageCollector) startControlLoop(_ context.Context) { + hardware.RegisterSystemMetricsListener(gc.systemMetricsListener) + defer hardware.UnregisterSystemMetricsListener(gc.systemMetricsListener) + for { select { case cmd := <-gc.cmdCh: @@ -337,6 +371,7 @@ func (gc *garbageCollector) recycleUnusedBinLogWithChecker(ctx context.Context, // ignore error since it could be cleaned up next time file := chunkInfo.FilePath + future := gc.option.removeObjectPool.Submit(func() (struct{}, error) { logger := logger.With(zap.String("file", file)) logger.Info("garbageCollector recycleUnusedBinlogFiles remove file...") diff --git a/internal/streamingnode/server/wal/interceptors/shard/stats/stats_seal_worker.go b/internal/streamingnode/server/wal/interceptors/shard/stats/stats_seal_worker.go index 7656ec7120..25b6ab7544 100644 --- a/internal/streamingnode/server/wal/interceptors/shard/stats/stats_seal_worker.go +++ b/internal/streamingnode/server/wal/interceptors/shard/stats/stats_seal_worker.go @@ -56,11 +56,11 @@ func (m *sealWorker) loop() { timer := time.NewTicker(m.timePolicyCheckInterval) listener := &hardware.SystemMetricsListener{ Cooldown: 30 * time.Second, - Condition: func(sm hardware.SystemMetrics) bool { + Condition: func(sm hardware.SystemMetrics, _ *hardware.SystemMetricsListener) bool { memoryThreshold := m.statsManager.getConfig().memoryThreshold return sm.UsedRatio() > memoryThreshold }, - Callback: func(sm hardware.SystemMetrics) { + Callback: func(sm hardware.SystemMetrics, _ *hardware.SystemMetricsListener) { select { case memoryNotifier <- policy.PolicyNodeMemory(sm.UsedRatio()): // the repeated notify can be ignored. diff --git a/pkg/util/hardware/listener.go b/pkg/util/hardware/listener.go index b35f1555fc..4adcd9a4b0 100644 --- a/pkg/util/hardware/listener.go +++ b/pkg/util/hardware/listener.go @@ -41,9 +41,10 @@ func (s SystemMetrics) String() string { // SystemMetricsListener is a listener that listens for system metrics. type SystemMetricsListener struct { nextTriggerInstant time.Time + Context any Cooldown time.Duration - Condition func(SystemMetrics) bool // condition to trigger the callback - Callback func(SystemMetrics) // callback function if the condition met, should be non-blocking. + Condition func(SystemMetrics, *SystemMetricsListener) bool // condition to trigger the callback + Callback func(SystemMetrics, *SystemMetricsListener) // callback function if the condition met, should be non-blocking. } // RegisterSystemMetricsListener registers a listener into global default systemMetricsWatcher. @@ -63,10 +64,10 @@ func getSystemMetricsWatcher() *SystemMericsWatcher { logger := log.With(log.FieldComponent("system-metrics")) warningLoggerListener := &SystemMetricsListener{ Cooldown: 1 * time.Minute, - Condition: func(stats SystemMetrics) bool { + Condition: func(stats SystemMetrics, listener *SystemMetricsListener) bool { return stats.UsedRatio() > 0.9 }, - Callback: func(sm SystemMetrics) { + Callback: func(sm SystemMetrics, listener *SystemMetricsListener) { logger.Warn("memory used ratio is extremely high", zap.String("memory", sm.String()), zap.Float64("usedRatio", sm.UsedRatio())) }, } @@ -150,8 +151,8 @@ func (w *SystemMericsWatcher) updateMetrics() { // cool down. continue } - if l.Condition(stats) { - l.Callback(stats) + if l.Condition(stats, l) { + l.Callback(stats, l) l.nextTriggerInstant = now.Add(l.Cooldown) } } diff --git a/pkg/util/hardware/listener_test.go b/pkg/util/hardware/listener_test.go index 7456ceff89..fd54f51d95 100644 --- a/pkg/util/hardware/listener_test.go +++ b/pkg/util/hardware/listener_test.go @@ -13,14 +13,19 @@ func TestListener(t *testing.T) { called := atomic.NewInt32(0) l := &SystemMetricsListener{ Cooldown: 100 * time.Millisecond, - Condition: func(stats SystemMetrics) bool { + Context: false, + Condition: func(stats SystemMetrics, listener *SystemMetricsListener) bool { assert.NotZero(t, stats.UsedMemoryBytes) assert.NotZero(t, stats.TotalMemoryBytes) assert.NotZero(t, stats.UsedRatio()) assert.NotEmpty(t, stats.String()) + assert.False(t, listener.Context.(bool)) + listener.Context = true return true }, - Callback: func(sm SystemMetrics) { + Callback: func(sm SystemMetrics, listener *SystemMetricsListener) { + ctx := listener.Context.(bool) + assert.True(t, ctx) assert.NotZero(t, sm.UsedMemoryBytes) assert.NotZero(t, sm.TotalMemoryBytes) assert.NotZero(t, sm.UsedRatio()) @@ -37,6 +42,7 @@ func TestListener(t *testing.T) { l2 := &SystemMetricsListener{ Cooldown: 100 * time.Millisecond, + Context: false, Condition: l.Condition, Callback: l.Callback, } diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 9c0ec98b13..1d40a80c2f 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -4019,13 +4019,14 @@ type dataCoordConfig struct { LevelZeroCompactionTriggerDeltalogMaxNum ParamItem `refreshable:"true"` // Garbage Collection - EnableGarbageCollection ParamItem `refreshable:"false"` - GCInterval ParamItem `refreshable:"false"` - GCMissingTolerance ParamItem `refreshable:"false"` - GCDropTolerance ParamItem `refreshable:"false"` - GCRemoveConcurrent ParamItem `refreshable:"false"` - GCScanIntervalInHour ParamItem `refreshable:"false"` - EnableActiveStandby ParamItem `refreshable:"false"` + EnableGarbageCollection ParamItem `refreshable:"false"` + GCInterval ParamItem `refreshable:"false"` + GCMissingTolerance ParamItem `refreshable:"false"` + GCDropTolerance ParamItem `refreshable:"false"` + GCRemoveConcurrent ParamItem `refreshable:"false"` + GCScanIntervalInHour ParamItem `refreshable:"false"` + GCSlowDownCPUUsageThreshold ParamItem `refreshable:"false"` + EnableActiveStandby ParamItem `refreshable:"false"` BindIndexNodeMode ParamItem `refreshable:"false"` IndexNodeAddress ParamItem `refreshable:"false"` @@ -4703,6 +4704,15 @@ During compaction, the size of segment # of rows is able to exceed segment max # } p.GCScanIntervalInHour.Init(base.mgr) + p.GCSlowDownCPUUsageThreshold = ParamItem{ + Key: "dataCoord.gc.slowDownCPUUsageThreshold", + Version: "2.6.0", + DefaultValue: "0.6", + Doc: "The CPU usage threshold at which the garbage collection will be slowed down", + Export: true, + } + p.GCSlowDownCPUUsageThreshold.Init(base.mgr) + // Do not set this to incredible small value, make sure this to be more than 10 minutes at least p.GCMissingTolerance = ParamItem{ Key: "dataCoord.gc.missingTolerance", diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index cac71cfd51..d6319bdc6f 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -527,6 +527,9 @@ func TestComponentParam(t *testing.T) { params.Save("datacoord.gracefulStopTimeout", "100") assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second)) + assert.Equal(t, 0.6, Params.GCSlowDownCPUUsageThreshold.GetAsFloat()) + params.Save("dataCoord.gc.slowDownCPUUsageThreshold", "0.5") + assert.Equal(t, 0.5, Params.GCSlowDownCPUUsageThreshold.GetAsFloat()) params.Save("dataCoord.compaction.gcInterval", "100") assert.Equal(t, float64(100), Params.CompactionGCIntervalInSeconds.GetAsDuration(time.Second).Seconds()) params.Save("dataCoord.compaction.dropTolerance", "100") diff --git a/tests/integration/sealpolicies/seal_by_total_growing_test.go b/tests/integration/sealpolicies/seal_by_total_growing_test.go index 82dc604874..43c790d555 100644 --- a/tests/integration/sealpolicies/seal_by_total_growing_test.go +++ b/tests/integration/sealpolicies/seal_by_total_growing_test.go @@ -98,7 +98,7 @@ func (s *SealSuite) TestSealByTotalGrowingSegmentsSize() { var segments []*datapb.SegmentInfo segments, err = c.ShowSegments(collectionName) s.NoError(err) - s.NotEmpty(segments) + // segment may be in growing state or can not be seen at meta right after insert. flushedSegments := lo.Filter(segments, func(segment *datapb.SegmentInfo, _ int) bool { return segment.GetState() == commonpb.SegmentState_Flushed })