From c36b54cb5782ee571e2c828140fff760ee399a1e Mon Sep 17 00:00:00 2001 From: congqixia Date: Thu, 25 Apr 2024 16:07:26 +0800 Subject: [PATCH] enhance: [2.3] Use different interval for gc scan (#31363) (#32551) Cherry-pick from master pr: #31363 See also #31362 This PR make datacoord garbage collection scan operation using differet interval than other opeartion. This interval is a newly added param item, which default value is 7*24 hours. Signed-off-by: Congqi Xia --- configs/milvus.yaml | 1 + internal/datacoord/garbage_collector.go | 15 ++++++++++++--- internal/datacoord/garbage_collector_test.go | 12 ++++++++++++ internal/datacoord/server.go | 1 + pkg/util/paramtable/component_param.go | 10 ++++++++++ 5 files changed, 36 insertions(+), 3 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 25ee250f99..78043b76f7 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -434,6 +434,7 @@ dataCoord: enableGarbageCollection: true gc: interval: 3600 # gc interval in seconds + scanInterval: 168 #gc residual file scan interval in hours missingTolerance: 3600 # file meta missing tolerance duration in seconds, 3600 dropTolerance: 10800 # file belongs to dropped entity tolerance duration in seconds. 10800 enableActiveStandby: false diff --git a/internal/datacoord/garbage_collector.go b/internal/datacoord/garbage_collector.go index a040e5d74a..0d3078f4a8 100644 --- a/internal/datacoord/garbage_collector.go +++ b/internal/datacoord/garbage_collector.go @@ -50,6 +50,7 @@ type GcOption struct { checkInterval time.Duration // each interval missingTolerance time.Duration // key missing in meta tolerance time dropTolerance time.Duration // dropped segment related key tolerance time + scanInterval time.Duration // interval for scan residue for interupted log wrttien removeLogPool *conc.Pool[struct{}] } @@ -76,8 +77,12 @@ type gcCmd struct { // newGarbageCollector create garbage collector with meta and option func newGarbageCollector(meta *meta, handler Handler, opt GcOption) *garbageCollector { - log.Info("GC with option", zap.Bool("enabled", opt.enabled), zap.Duration("interval", opt.checkInterval), - zap.Duration("missingTolerance", opt.missingTolerance), zap.Duration("dropTolerance", opt.dropTolerance)) + log.Info("GC with option", + zap.Bool("enabled", opt.enabled), + zap.Duration("interval", opt.checkInterval), + zap.Duration("scanInterval", opt.scanInterval), + zap.Duration("missingTolerance", opt.missingTolerance), + zap.Duration("dropTolerance", opt.dropTolerance)) opt.removeLogPool = conc.NewPool[struct{}](Params.DataCoordCfg.GCRemoveConcurrent.GetAsInt(), conc.WithExpiryDuration(time.Minute)) return &garbageCollector{ meta: meta, @@ -144,6 +149,8 @@ func (gc *garbageCollector) work() { defer gc.wg.Done() ticker := time.NewTicker(gc.option.checkInterval) defer ticker.Stop() + scanTicker := time.NewTicker(gc.option.scanInterval) + defer ticker.Stop() for { select { case <-ticker.C: @@ -154,8 +161,10 @@ func (gc *garbageCollector) work() { gc.clearEtcd() gc.recycleUnusedIndexes() gc.recycleUnusedSegIndexes() - gc.scan() gc.recycleUnusedIndexFiles() + case <-scanTicker.C: + log.Info("Garbage collector start to scan interrupted write residue") + gc.scan() case cmd := <-gc.cmdCh: switch cmd.cmdType { case datapb.GcCommand_Pause: diff --git a/internal/datacoord/garbage_collector_test.go b/internal/datacoord/garbage_collector_test.go index 54a7fb5b97..909daf6df5 100644 --- a/internal/datacoord/garbage_collector_test.go +++ b/internal/datacoord/garbage_collector_test.go @@ -67,6 +67,7 @@ func Test_garbageCollector_basic(t *testing.T) { cli: cli, enabled: true, checkInterval: time.Millisecond * 10, + scanInterval: time.Hour * 7 * 24, missingTolerance: time.Hour * 24, dropTolerance: time.Hour * 24, }) @@ -83,6 +84,7 @@ func Test_garbageCollector_basic(t *testing.T) { cli: nil, enabled: true, checkInterval: time.Millisecond * 10, + scanInterval: time.Hour * 7 * 24, missingTolerance: time.Hour * 24, dropTolerance: time.Hour * 24, }) @@ -119,6 +121,7 @@ func Test_garbageCollector_scan(t *testing.T) { cli: cli, enabled: true, checkInterval: time.Minute * 30, + scanInterval: time.Hour * 7 * 24, missingTolerance: time.Hour * 24, dropTolerance: time.Hour * 24, }) @@ -136,6 +139,7 @@ func Test_garbageCollector_scan(t *testing.T) { cli: cli, enabled: true, checkInterval: time.Minute * 30, + scanInterval: time.Hour * 7 * 24, missingTolerance: time.Hour * 24, dropTolerance: time.Hour * 24, }) @@ -161,6 +165,7 @@ func Test_garbageCollector_scan(t *testing.T) { cli: cli, enabled: true, checkInterval: time.Minute * 30, + scanInterval: time.Hour * 7 * 24, missingTolerance: time.Hour * 24, dropTolerance: time.Hour * 24, }) @@ -189,6 +194,7 @@ func Test_garbageCollector_scan(t *testing.T) { cli: cli, enabled: true, checkInterval: time.Minute * 30, + scanInterval: time.Hour * 7 * 24, missingTolerance: time.Hour * 24, dropTolerance: 0, }) @@ -205,6 +211,7 @@ func Test_garbageCollector_scan(t *testing.T) { cli: cli, enabled: true, checkInterval: time.Minute * 30, + scanInterval: time.Hour * 7 * 24, missingTolerance: 0, dropTolerance: 0, }) @@ -226,6 +233,7 @@ func Test_garbageCollector_scan(t *testing.T) { cli: cli, enabled: true, checkInterval: time.Minute * 30, + scanInterval: time.Hour * 7 * 24, missingTolerance: 0, dropTolerance: 0, }) @@ -1480,6 +1488,7 @@ func (s *GarbageCollectorSuite) TestPauseResume() { cli: s.cli, enabled: false, checkInterval: time.Millisecond * 10, + scanInterval: time.Hour * 24 * 7, missingTolerance: time.Hour * 24, dropTolerance: time.Hour * 24, }) @@ -1501,6 +1510,7 @@ func (s *GarbageCollectorSuite) TestPauseResume() { cli: s.cli, enabled: true, checkInterval: time.Millisecond * 10, + scanInterval: time.Hour * 7 * 24, missingTolerance: time.Hour * 24, dropTolerance: time.Hour * 24, }) @@ -1525,6 +1535,7 @@ func (s *GarbageCollectorSuite) TestPauseResume() { cli: s.cli, enabled: true, checkInterval: time.Millisecond * 10, + scanInterval: time.Hour * 7 * 24, missingTolerance: time.Hour * 24, dropTolerance: time.Hour * 24, }) @@ -1552,6 +1563,7 @@ func (s *GarbageCollectorSuite) TestPauseResume() { cli: s.cli, enabled: true, checkInterval: time.Millisecond * 10, + scanInterval: time.Hour * 7 * 24, missingTolerance: time.Hour * 24, dropTolerance: time.Hour * 24, }) diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index aa0867d5d2..c2eb18fdad 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -487,6 +487,7 @@ func (s *Server) initGarbageCollection(cli storage.ChunkManager) { cli: cli, enabled: Params.DataCoordCfg.EnableGarbageCollection.GetAsBool(), checkInterval: Params.DataCoordCfg.GCInterval.GetAsDuration(time.Second), + scanInterval: Params.DataCoordCfg.GCScanIntervalInHour.GetAsDuration(time.Hour), missingTolerance: Params.DataCoordCfg.GCMissingTolerance.GetAsDuration(time.Second), dropTolerance: Params.DataCoordCfg.GCDropTolerance.GetAsDuration(time.Second), }) diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 6b249fc2f2..c0324381e8 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2344,6 +2344,7 @@ type dataCoordConfig struct { GCMissingTolerance ParamItem `refreshable:"false"` GCDropTolerance ParamItem `refreshable:"false"` GCRemoveConcurrent ParamItem `refreshable:"false"` + GCScanIntervalInHour ParamItem `refreshable:"false"` EnableActiveStandby ParamItem `refreshable:"false"` BindIndexNodeMode ParamItem `refreshable:"false"` @@ -2640,6 +2641,15 @@ During compaction, the size of segment # of rows is able to exceed segment max # } p.GCInterval.Init(base.mgr) + p.GCScanIntervalInHour = ParamItem{ + Key: "dataCoord.gc.scanInterval", + Version: "2.4.0", + DefaultValue: "168", // hours, default 7 * 24 hours + Doc: "garbage collection scan residue interval in hours", + Export: true, + } + p.GCScanIntervalInHour.Init(base.mgr) + // Do not set this to incredible small value, make sure this to be more than 10 minutes at least p.GCMissingTolerance = ParamItem{ Key: "dataCoord.gc.missingTolerance",