mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-30 23:45:28 +08:00
Stop retention gracefully (#10367)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
fd95ad8e4a
commit
c2c2036154
@ -202,7 +202,7 @@ func (rmq *rocksmq) Close() {
|
||||
|
||||
func (rmq *rocksmq) stopRetention() {
|
||||
if rmq.retentionInfo != nil {
|
||||
rmq.retentionInfo.cancel()
|
||||
rmq.retentionInfo.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -12,7 +12,6 @@
|
||||
package rocksmq
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
@ -55,8 +54,6 @@ type topicAckedInfo struct {
|
||||
}
|
||||
|
||||
type retentionInfo struct {
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
topics []string
|
||||
// pageInfo map[string]*topicPageInfo
|
||||
pageInfo sync.Map
|
||||
@ -70,6 +67,10 @@ type retentionInfo struct {
|
||||
|
||||
kv *rocksdbkv.RocksdbKV
|
||||
db *gorocksdb.DB
|
||||
|
||||
closeCh chan struct{}
|
||||
closeWg sync.WaitGroup
|
||||
closeOnce sync.Once
|
||||
}
|
||||
|
||||
// Interface LoadWithPrefix() in rocksdbkv needs to close db instance first and then reopen,
|
||||
@ -99,10 +100,7 @@ func prefixLoad(db *gorocksdb.DB, prefix string) ([]string, []string, error) {
|
||||
}
|
||||
|
||||
func initRetentionInfo(kv *rocksdbkv.RocksdbKV, db *gorocksdb.DB) (*retentionInfo, error) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
ri := &retentionInfo{
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
topics: make([]string, 0),
|
||||
pageInfo: sync.Map{},
|
||||
ackedInfo: sync.Map{},
|
||||
@ -110,6 +108,8 @@ func initRetentionInfo(kv *rocksdbkv.RocksdbKV, db *gorocksdb.DB) (*retentionInf
|
||||
mutex: sync.RWMutex{},
|
||||
kv: kv,
|
||||
db: db,
|
||||
closeCh: make(chan struct{}),
|
||||
closeWg: sync.WaitGroup{},
|
||||
}
|
||||
// Get topic from topic begin id
|
||||
beginIDKeys, _, err := ri.kv.LoadWithPrefix(TopicBeginIDTitle)
|
||||
@ -137,6 +137,7 @@ func (ri *retentionInfo) startRetentionInfo() {
|
||||
// }
|
||||
// wg.Wait()
|
||||
// log.Debug("Finish load retention info, start retention")
|
||||
ri.closeWg.Add(1)
|
||||
go ri.retention()
|
||||
}
|
||||
|
||||
@ -279,10 +280,11 @@ func (ri *retentionInfo) retention() error {
|
||||
log.Debug("Rocksmq retention goroutine start!")
|
||||
// Do retention check every 6s
|
||||
ticker := time.NewTicker(time.Duration(atomic.LoadInt64(&TickerTimeInSeconds) * int64(time.Second)))
|
||||
defer ri.closeWg.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ri.ctx.Done():
|
||||
case <-ri.closeCh:
|
||||
log.Debug("Rocksmq retention finish!")
|
||||
return nil
|
||||
case t := <-ticker.C:
|
||||
@ -323,6 +325,13 @@ func (ri *retentionInfo) retention() error {
|
||||
}
|
||||
}
|
||||
|
||||
func (ri *retentionInfo) Stop() {
|
||||
ri.closeOnce.Do(func() {
|
||||
close(ri.closeCh)
|
||||
ri.closeWg.Wait()
|
||||
})
|
||||
}
|
||||
|
||||
func (ri *retentionInfo) newExpiredCleanUp(topic string) error {
|
||||
log.Debug("Timeticker triggers an expiredCleanUp task for topic: " + topic)
|
||||
ll, ok := topicMu.Load(topic)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user