mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-28 22:45:26 +08:00
Move topic lock to the front of final delete in retention expired cleanup (#11076)
Signed-off-by: fishpenguin <kun.yu@zilliz.com>
This commit is contained in:
parent
a8722797fe
commit
ea6cd7d27c
@ -155,17 +155,6 @@ func (ri *retentionInfo) Stop() {
|
||||
|
||||
func (ri *retentionInfo) newExpiredCleanUp(topic string) error {
|
||||
log.Debug("Timeticker triggers an expiredCleanUp task for topic: " + topic)
|
||||
ll, ok := topicMu.Load(topic)
|
||||
if !ok {
|
||||
return fmt.Errorf("topic name = %s not exist", topic)
|
||||
}
|
||||
lock, ok := ll.(*sync.Mutex)
|
||||
if !ok {
|
||||
return fmt.Errorf("get mutex failed, topic name = %s", topic)
|
||||
}
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
|
||||
var deletedAckedSize int64 = 0
|
||||
var startID UniqueID
|
||||
var endID UniqueID
|
||||
@ -346,7 +335,25 @@ func (ri *retentionInfo) newExpiredCleanUp(topic string) error {
|
||||
writeBatch.DeleteRange([]byte(ackedStartIDKey), []byte(ackedEndIDKey))
|
||||
}
|
||||
|
||||
newAckedSize := totalAckedSize - deletedAckedSize
|
||||
ll, ok := topicMu.Load(topic)
|
||||
if !ok {
|
||||
return fmt.Errorf("topic name = %s not exist", topic)
|
||||
}
|
||||
lock, ok := ll.(*sync.Mutex)
|
||||
if !ok {
|
||||
return fmt.Errorf("get mutex failed, topic name = %s", topic)
|
||||
}
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
currentAckedSizeVal, err := ri.kv.Load(ackedSizeKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
currentAckedSize, err := strconv.ParseInt(currentAckedSizeVal, 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
newAckedSize := currentAckedSize - deletedAckedSize
|
||||
writeBatch.Put([]byte(ackedSizeKey), []byte(strconv.FormatInt(newAckedSize, 10)))
|
||||
|
||||
err = DeleteMessages(ri.db, topic, startID, endID)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user