enhance: limit the gc concurrency when cpu is high (#43059)

issue: #42833

Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
Zhen Ye 2025-07-04 09:22:43 +08:00 committed by GitHub
parent 1d9a9a993d
commit e97e44d56e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 80 additions and 24 deletions

View File

@ -679,6 +679,7 @@ dataCoord:
dropTolerance: 10800 # The retention duration of the binlog files of the deleted segments before they are cleared, unit: second. dropTolerance: 10800 # The retention duration of the binlog files of the deleted segments before they are cleared, unit: second.
removeConcurrent: 32 # number of concurrent goroutines to remove dropped s3 objects removeConcurrent: 32 # number of concurrent goroutines to remove dropped s3 objects
scanInterval: 168 # orphan file (file on oss but has not been registered on meta) on object storage garbage collection scanning interval in hours scanInterval: 168 # orphan file (file on oss but has not been registered on meta) on object storage garbage collection scanning interval in hours
slowDownCPUUsageThreshold: 0.6 # The CPU usage threshold at which the garbage collection will be slowed down
enableActiveStandby: false enableActiveStandby: false
brokerTimeout: 5000 # 5000ms, dataCoord broker rpc timeout brokerTimeout: 5000 # 5000ms, dataCoord broker rpc timeout
autoBalance: true # Enable auto balance autoBalance: true # Enable auto balance

View File

@ -39,6 +39,7 @@ import (
"github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/util/conc" "github.com/milvus-io/milvus/pkg/v2/util/conc"
"github.com/milvus-io/milvus/pkg/v2/util/funcutil" "github.com/milvus-io/milvus/pkg/v2/util/funcutil"
"github.com/milvus-io/milvus/pkg/v2/util/hardware"
"github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/metautil" "github.com/milvus-io/milvus/pkg/v2/util/metautil"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/paramtable"
@ -73,13 +74,42 @@ type garbageCollector struct {
wg sync.WaitGroup wg sync.WaitGroup
cmdCh chan gcCmd cmdCh chan gcCmd
pauseUntil atomic.Time pauseUntil atomic.Time
systemMetricsListener *hardware.SystemMetricsListener
} }
type gcCmd struct { type gcCmd struct {
cmdType datapb.GcCommand cmdType datapb.GcCommand
duration time.Duration duration time.Duration
done chan struct{} done chan struct{}
} }
// newSystemMetricsListener creates a system metrics listener for garbage collector.
// used to slow down the garbage collector when cpu usage is high.
func newSystemMetricsListener(opt *GcOption) *hardware.SystemMetricsListener {
return &hardware.SystemMetricsListener{
Cooldown: 15 * time.Second,
Context: false,
Condition: func(metrics hardware.SystemMetrics, listener *hardware.SystemMetricsListener) bool { return true },
Callback: func(metrics hardware.SystemMetrics, listener *hardware.SystemMetricsListener) {
isSlowDown := listener.Context.(bool)
if metrics.UsedRatio() > paramtable.Get().DataCoordCfg.GCSlowDownCPUUsageThreshold.GetAsFloat() {
if !isSlowDown {
log.Info("garbage collector slow down...", zap.Float64("cpuUsage", metrics.UsedRatio()))
opt.removeObjectPool.Resize(1)
listener.Context = true
}
return
}
if isSlowDown {
log.Info("garbage collector slow down finished", zap.Float64("cpuUsage", metrics.UsedRatio()))
opt.removeObjectPool.Resize(paramtable.Get().DataCoordCfg.GCRemoveConcurrent.GetAsInt())
listener.Context = false
}
},
}
}
// newGarbageCollector create garbage collector with meta and option // newGarbageCollector create garbage collector with meta and option
func newGarbageCollector(meta *meta, handler Handler, opt GcOption) *garbageCollector { func newGarbageCollector(meta *meta, handler Handler, opt GcOption) *garbageCollector {
log.Info("GC with option", log.Info("GC with option",
@ -91,12 +121,13 @@ func newGarbageCollector(meta *meta, handler Handler, opt GcOption) *garbageColl
opt.removeObjectPool = conc.NewPool[struct{}](Params.DataCoordCfg.GCRemoveConcurrent.GetAsInt(), conc.WithExpiryDuration(time.Minute)) opt.removeObjectPool = conc.NewPool[struct{}](Params.DataCoordCfg.GCRemoveConcurrent.GetAsInt(), conc.WithExpiryDuration(time.Minute))
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
return &garbageCollector{ return &garbageCollector{
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
meta: meta, meta: meta,
handler: handler, handler: handler,
option: opt, option: opt,
cmdCh: make(chan gcCmd), cmdCh: make(chan gcCmd),
systemMetricsListener: newSystemMetricsListener(&opt),
} }
} }
@ -182,6 +213,9 @@ func (gc *garbageCollector) work(ctx context.Context) {
// startControlLoop start a control loop for garbageCollector. // startControlLoop start a control loop for garbageCollector.
func (gc *garbageCollector) startControlLoop(_ context.Context) { func (gc *garbageCollector) startControlLoop(_ context.Context) {
hardware.RegisterSystemMetricsListener(gc.systemMetricsListener)
defer hardware.UnregisterSystemMetricsListener(gc.systemMetricsListener)
for { for {
select { select {
case cmd := <-gc.cmdCh: case cmd := <-gc.cmdCh:
@ -337,6 +371,7 @@ func (gc *garbageCollector) recycleUnusedBinLogWithChecker(ctx context.Context,
// ignore error since it could be cleaned up next time // ignore error since it could be cleaned up next time
file := chunkInfo.FilePath file := chunkInfo.FilePath
future := gc.option.removeObjectPool.Submit(func() (struct{}, error) { future := gc.option.removeObjectPool.Submit(func() (struct{}, error) {
logger := logger.With(zap.String("file", file)) logger := logger.With(zap.String("file", file))
logger.Info("garbageCollector recycleUnusedBinlogFiles remove file...") logger.Info("garbageCollector recycleUnusedBinlogFiles remove file...")

View File

@ -56,11 +56,11 @@ func (m *sealWorker) loop() {
timer := time.NewTicker(m.timePolicyCheckInterval) timer := time.NewTicker(m.timePolicyCheckInterval)
listener := &hardware.SystemMetricsListener{ listener := &hardware.SystemMetricsListener{
Cooldown: 30 * time.Second, Cooldown: 30 * time.Second,
Condition: func(sm hardware.SystemMetrics) bool { Condition: func(sm hardware.SystemMetrics, _ *hardware.SystemMetricsListener) bool {
memoryThreshold := m.statsManager.getConfig().memoryThreshold memoryThreshold := m.statsManager.getConfig().memoryThreshold
return sm.UsedRatio() > memoryThreshold return sm.UsedRatio() > memoryThreshold
}, },
Callback: func(sm hardware.SystemMetrics) { Callback: func(sm hardware.SystemMetrics, _ *hardware.SystemMetricsListener) {
select { select {
case memoryNotifier <- policy.PolicyNodeMemory(sm.UsedRatio()): case memoryNotifier <- policy.PolicyNodeMemory(sm.UsedRatio()):
// the repeated notify can be ignored. // the repeated notify can be ignored.

View File

@ -41,9 +41,10 @@ func (s SystemMetrics) String() string {
// SystemMetricsListener is a listener that listens for system metrics. // SystemMetricsListener is a listener that listens for system metrics.
type SystemMetricsListener struct { type SystemMetricsListener struct {
nextTriggerInstant time.Time nextTriggerInstant time.Time
Context any
Cooldown time.Duration Cooldown time.Duration
Condition func(SystemMetrics) bool // condition to trigger the callback Condition func(SystemMetrics, *SystemMetricsListener) bool // condition to trigger the callback
Callback func(SystemMetrics) // callback function if the condition met, should be non-blocking. Callback func(SystemMetrics, *SystemMetricsListener) // callback function if the condition met, should be non-blocking.
} }
// RegisterSystemMetricsListener registers a listener into global default systemMetricsWatcher. // RegisterSystemMetricsListener registers a listener into global default systemMetricsWatcher.
@ -63,10 +64,10 @@ func getSystemMetricsWatcher() *SystemMericsWatcher {
logger := log.With(log.FieldComponent("system-metrics")) logger := log.With(log.FieldComponent("system-metrics"))
warningLoggerListener := &SystemMetricsListener{ warningLoggerListener := &SystemMetricsListener{
Cooldown: 1 * time.Minute, Cooldown: 1 * time.Minute,
Condition: func(stats SystemMetrics) bool { Condition: func(stats SystemMetrics, listener *SystemMetricsListener) bool {
return stats.UsedRatio() > 0.9 return stats.UsedRatio() > 0.9
}, },
Callback: func(sm SystemMetrics) { Callback: func(sm SystemMetrics, listener *SystemMetricsListener) {
logger.Warn("memory used ratio is extremely high", zap.String("memory", sm.String()), zap.Float64("usedRatio", sm.UsedRatio())) logger.Warn("memory used ratio is extremely high", zap.String("memory", sm.String()), zap.Float64("usedRatio", sm.UsedRatio()))
}, },
} }
@ -150,8 +151,8 @@ func (w *SystemMericsWatcher) updateMetrics() {
// cool down. // cool down.
continue continue
} }
if l.Condition(stats) { if l.Condition(stats, l) {
l.Callback(stats) l.Callback(stats, l)
l.nextTriggerInstant = now.Add(l.Cooldown) l.nextTriggerInstant = now.Add(l.Cooldown)
} }
} }

View File

@ -13,14 +13,19 @@ func TestListener(t *testing.T) {
called := atomic.NewInt32(0) called := atomic.NewInt32(0)
l := &SystemMetricsListener{ l := &SystemMetricsListener{
Cooldown: 100 * time.Millisecond, Cooldown: 100 * time.Millisecond,
Condition: func(stats SystemMetrics) bool { Context: false,
Condition: func(stats SystemMetrics, listener *SystemMetricsListener) bool {
assert.NotZero(t, stats.UsedMemoryBytes) assert.NotZero(t, stats.UsedMemoryBytes)
assert.NotZero(t, stats.TotalMemoryBytes) assert.NotZero(t, stats.TotalMemoryBytes)
assert.NotZero(t, stats.UsedRatio()) assert.NotZero(t, stats.UsedRatio())
assert.NotEmpty(t, stats.String()) assert.NotEmpty(t, stats.String())
assert.False(t, listener.Context.(bool))
listener.Context = true
return true return true
}, },
Callback: func(sm SystemMetrics) { Callback: func(sm SystemMetrics, listener *SystemMetricsListener) {
ctx := listener.Context.(bool)
assert.True(t, ctx)
assert.NotZero(t, sm.UsedMemoryBytes) assert.NotZero(t, sm.UsedMemoryBytes)
assert.NotZero(t, sm.TotalMemoryBytes) assert.NotZero(t, sm.TotalMemoryBytes)
assert.NotZero(t, sm.UsedRatio()) assert.NotZero(t, sm.UsedRatio())
@ -37,6 +42,7 @@ func TestListener(t *testing.T) {
l2 := &SystemMetricsListener{ l2 := &SystemMetricsListener{
Cooldown: 100 * time.Millisecond, Cooldown: 100 * time.Millisecond,
Context: false,
Condition: l.Condition, Condition: l.Condition,
Callback: l.Callback, Callback: l.Callback,
} }

View File

@ -4019,13 +4019,14 @@ type dataCoordConfig struct {
LevelZeroCompactionTriggerDeltalogMaxNum ParamItem `refreshable:"true"` LevelZeroCompactionTriggerDeltalogMaxNum ParamItem `refreshable:"true"`
// Garbage Collection // Garbage Collection
EnableGarbageCollection ParamItem `refreshable:"false"` EnableGarbageCollection ParamItem `refreshable:"false"`
GCInterval ParamItem `refreshable:"false"` GCInterval ParamItem `refreshable:"false"`
GCMissingTolerance ParamItem `refreshable:"false"` GCMissingTolerance ParamItem `refreshable:"false"`
GCDropTolerance ParamItem `refreshable:"false"` GCDropTolerance ParamItem `refreshable:"false"`
GCRemoveConcurrent ParamItem `refreshable:"false"` GCRemoveConcurrent ParamItem `refreshable:"false"`
GCScanIntervalInHour ParamItem `refreshable:"false"` GCScanIntervalInHour ParamItem `refreshable:"false"`
EnableActiveStandby ParamItem `refreshable:"false"` GCSlowDownCPUUsageThreshold ParamItem `refreshable:"false"`
EnableActiveStandby ParamItem `refreshable:"false"`
BindIndexNodeMode ParamItem `refreshable:"false"` BindIndexNodeMode ParamItem `refreshable:"false"`
IndexNodeAddress ParamItem `refreshable:"false"` IndexNodeAddress ParamItem `refreshable:"false"`
@ -4703,6 +4704,15 @@ During compaction, the size of segment # of rows is able to exceed segment max #
} }
p.GCScanIntervalInHour.Init(base.mgr) p.GCScanIntervalInHour.Init(base.mgr)
p.GCSlowDownCPUUsageThreshold = ParamItem{
Key: "dataCoord.gc.slowDownCPUUsageThreshold",
Version: "2.6.0",
DefaultValue: "0.6",
Doc: "The CPU usage threshold at which the garbage collection will be slowed down",
Export: true,
}
p.GCSlowDownCPUUsageThreshold.Init(base.mgr)
// Do not set this to incredible small value, make sure this to be more than 10 minutes at least // Do not set this to incredible small value, make sure this to be more than 10 minutes at least
p.GCMissingTolerance = ParamItem{ p.GCMissingTolerance = ParamItem{
Key: "dataCoord.gc.missingTolerance", Key: "dataCoord.gc.missingTolerance",

View File

@ -527,6 +527,9 @@ func TestComponentParam(t *testing.T) {
params.Save("datacoord.gracefulStopTimeout", "100") params.Save("datacoord.gracefulStopTimeout", "100")
assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second)) assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second))
assert.Equal(t, 0.6, Params.GCSlowDownCPUUsageThreshold.GetAsFloat())
params.Save("dataCoord.gc.slowDownCPUUsageThreshold", "0.5")
assert.Equal(t, 0.5, Params.GCSlowDownCPUUsageThreshold.GetAsFloat())
params.Save("dataCoord.compaction.gcInterval", "100") params.Save("dataCoord.compaction.gcInterval", "100")
assert.Equal(t, float64(100), Params.CompactionGCIntervalInSeconds.GetAsDuration(time.Second).Seconds()) assert.Equal(t, float64(100), Params.CompactionGCIntervalInSeconds.GetAsDuration(time.Second).Seconds())
params.Save("dataCoord.compaction.dropTolerance", "100") params.Save("dataCoord.compaction.dropTolerance", "100")

View File

@ -98,7 +98,7 @@ func (s *SealSuite) TestSealByTotalGrowingSegmentsSize() {
var segments []*datapb.SegmentInfo var segments []*datapb.SegmentInfo
segments, err = c.ShowSegments(collectionName) segments, err = c.ShowSegments(collectionName)
s.NoError(err) s.NoError(err)
s.NotEmpty(segments) // segment may be in growing state or can not be seen at meta right after insert.
flushedSegments := lo.Filter(segments, func(segment *datapb.SegmentInfo, _ int) bool { flushedSegments := lo.Filter(segments, func(segment *datapb.SegmentInfo, _ int) bool {
return segment.GetState() == commonpb.SegmentState_Flushed return segment.GetState() == commonpb.SegmentState_Flushed
}) })