From c2c2036154ea9e91510b63644dfacb8fc0382ba5 Mon Sep 17 00:00:00 2001 From: congqixia Date: Thu, 21 Oct 2021 19:50:26 +0800 Subject: [PATCH] Stop retention gracefully (#10367) Signed-off-by: Congqi Xia --- .../rocksmq/server/rocksmq/rocksmq_impl.go | 2 +- .../server/rocksmq/rocksmq_retention.go | 23 +++++++++++++------ 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go index 4da94591d1..3845def47f 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_impl.go @@ -202,7 +202,7 @@ func (rmq *rocksmq) Close() { func (rmq *rocksmq) stopRetention() { if rmq.retentionInfo != nil { - rmq.retentionInfo.cancel() + rmq.retentionInfo.Stop() } } diff --git a/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go b/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go index 42e428110c..76d61a8307 100644 --- a/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go +++ b/internal/util/rocksmq/server/rocksmq/rocksmq_retention.go @@ -12,7 +12,6 @@ package rocksmq import ( - "context" "errors" "fmt" "math" @@ -55,8 +54,6 @@ type topicAckedInfo struct { } type retentionInfo struct { - ctx context.Context - cancel context.CancelFunc topics []string // pageInfo map[string]*topicPageInfo pageInfo sync.Map @@ -70,6 +67,10 @@ type retentionInfo struct { kv *rocksdbkv.RocksdbKV db *gorocksdb.DB + + closeCh chan struct{} + closeWg sync.WaitGroup + closeOnce sync.Once } // Interface LoadWithPrefix() in rocksdbkv needs to close db instance first and then reopen, @@ -99,10 +100,7 @@ func prefixLoad(db *gorocksdb.DB, prefix string) ([]string, []string, error) { } func initRetentionInfo(kv *rocksdbkv.RocksdbKV, db *gorocksdb.DB) (*retentionInfo, error) { - ctx, cancel := context.WithCancel(context.Background()) ri := &retentionInfo{ - ctx: ctx, - cancel: cancel, topics: make([]string, 0), pageInfo: sync.Map{}, ackedInfo: sync.Map{}, @@ -110,6 +108,8 @@ func initRetentionInfo(kv *rocksdbkv.RocksdbKV, db *gorocksdb.DB) (*retentionInf mutex: sync.RWMutex{}, kv: kv, db: db, + closeCh: make(chan struct{}), + closeWg: sync.WaitGroup{}, } // Get topic from topic begin id beginIDKeys, _, err := ri.kv.LoadWithPrefix(TopicBeginIDTitle) @@ -137,6 +137,7 @@ func (ri *retentionInfo) startRetentionInfo() { // } // wg.Wait() // log.Debug("Finish load retention info, start retention") + ri.closeWg.Add(1) go ri.retention() } @@ -279,10 +280,11 @@ func (ri *retentionInfo) retention() error { log.Debug("Rocksmq retention goroutine start!") // Do retention check every 6s ticker := time.NewTicker(time.Duration(atomic.LoadInt64(&TickerTimeInSeconds) * int64(time.Second))) + defer ri.closeWg.Done() for { select { - case <-ri.ctx.Done(): + case <-ri.closeCh: log.Debug("Rocksmq retention finish!") return nil case t := <-ticker.C: @@ -323,6 +325,13 @@ func (ri *retentionInfo) retention() error { } } +func (ri *retentionInfo) Stop() { + ri.closeOnce.Do(func() { + close(ri.closeCh) + ri.closeWg.Wait() + }) +} + func (ri *retentionInfo) newExpiredCleanUp(topic string) error { log.Debug("Timeticker triggers an expiredCleanUp task for topic: " + topic) ll, ok := topicMu.Load(topic)