mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
Add some logs in rocksmq retention (#7403)
Signed-off-by: fishpenguin <kun.yu@zilliz.com>
This commit is contained in:
parent
d4b979f6a8
commit
d5d73833b2
@ -260,8 +260,8 @@ func (ri *retentionInfo) retention() error {
|
|||||||
return nil
|
return nil
|
||||||
case t := <-ticker.C:
|
case t := <-ticker.C:
|
||||||
timeNow := t.Unix()
|
timeNow := t.Unix()
|
||||||
checkTime := RocksmqRetentionTimeInMinutes * 60 / 10
|
checkTime := RocksmqRetentionTimeInMinutes * MINUTE / 10
|
||||||
log.Debug("In ticker: ", zap.Any("ticker", timeNow))
|
log.Debug("A retention triggered by time ticker: ", zap.Any("ticker", timeNow))
|
||||||
ri.lastRetentionTime.Range(func(k, v interface{}) bool {
|
ri.lastRetentionTime.Range(func(k, v interface{}) bool {
|
||||||
if v.(int64)+checkTime < timeNow {
|
if v.(int64)+checkTime < timeNow {
|
||||||
err := ri.expiredCleanUp(k.(string))
|
err := ri.expiredCleanUp(k.(string))
|
||||||
@ -271,14 +271,6 @@ func (ri *retentionInfo) retention() error {
|
|||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
// for k, v := range ri.lastRetentionTime {
|
|
||||||
// if v+checkTime < timeNow {
|
|
||||||
// err := ri.expiredCleanUp(k)
|
|
||||||
// if err != nil {
|
|
||||||
// panic(err)
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -350,7 +342,7 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.Debug("In expiredCleanUp: ", zap.Any("topic", topic), zap.Any("endID", endID), zap.Any("deletedAckedSize", deletedAckedSize))
|
log.Debug("Expired check by page info", zap.Any("topic", topic), zap.Any("pageEndID", endID), zap.Any("deletedAckedSize", deletedAckedSize))
|
||||||
|
|
||||||
pageEndID := endID
|
pageEndID := endID
|
||||||
// The end msg of the page is not expired, find the last expired msg in this page
|
// The end msg of the page is not expired, find the last expired msg in this page
|
||||||
@ -369,9 +361,10 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if endID == 0 {
|
if endID == 0 {
|
||||||
log.Debug("All messages are not expired")
|
log.Debug("All messages are not expired", zap.Any("topic", topic))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
log.Debug("Expired check by retention time", zap.Any("topic", topic), zap.Any("startID", startID), zap.Any("endID", endID), zap.Any("deletedAckedSize", deletedAckedSize))
|
||||||
|
|
||||||
// Delete page message size in rocksdb_kv
|
// Delete page message size in rocksdb_kv
|
||||||
if pageInfo != nil {
|
if pageInfo != nil {
|
||||||
@ -388,6 +381,7 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error {
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
log.Debug("Expired check by retention size", zap.Any("topic", topic), zap.Any("new endID", endID), zap.Any("new deletedAckedSize", deletedAckedSize))
|
||||||
}
|
}
|
||||||
|
|
||||||
if pageEndID > 0 && len(pageInfo.pageEndID) > 0 {
|
if pageEndID > 0 && len(pageInfo.pageEndID) > 0 {
|
||||||
@ -403,7 +397,7 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error {
|
|||||||
log.Debug("Delete page info", zap.Any("topic", topic), zap.Any("pageStartID", pageStartID), zap.Any("pageEndID", pageEndID))
|
log.Debug("Delete page info", zap.Any("topic", topic), zap.Any("pageStartID", pageStartID), zap.Any("pageEndID", pageEndID))
|
||||||
if pageStartID == pageEndID {
|
if pageStartID == pageEndID {
|
||||||
pageWriteBatch.Delete([]byte(pageStartKey))
|
pageWriteBatch.Delete([]byte(pageStartKey))
|
||||||
} else {
|
} else if pageStartID < pageEndID {
|
||||||
pageWriteBatch.DeleteRange([]byte(pageStartKey), []byte(pageEndKey))
|
pageWriteBatch.DeleteRange([]byte(pageStartKey), []byte(pageEndKey))
|
||||||
}
|
}
|
||||||
ri.kv.DB.Write(gorocksdb.NewDefaultWriteOptions(), pageWriteBatch)
|
ri.kv.DB.Write(gorocksdb.NewDefaultWriteOptions(), pageWriteBatch)
|
||||||
@ -412,6 +406,7 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error {
|
|||||||
}
|
}
|
||||||
ri.pageInfo.Store(topic, pageInfo)
|
ri.pageInfo.Store(topic, pageInfo)
|
||||||
}
|
}
|
||||||
|
log.Debug("ExpiredCleanUp: ", zap.Any("topic", topic), zap.Any("startID", startID), zap.Any("endID", endID), zap.Any("deletedAckedSize", deletedAckedSize))
|
||||||
|
|
||||||
// Delete acked_ts in rocksdb_kv
|
// Delete acked_ts in rocksdb_kv
|
||||||
fixedAckedTsTitle, err := constructKey(AckedTsTitle, topic)
|
fixedAckedTsTitle, err := constructKey(AckedTsTitle, topic)
|
||||||
@ -422,7 +417,9 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error {
|
|||||||
ackedEndIDKey := fixedAckedTsTitle + "/" + strconv.Itoa(int(endID))
|
ackedEndIDKey := fixedAckedTsTitle + "/" + strconv.Itoa(int(endID))
|
||||||
ackedTsWriteBatch := gorocksdb.NewWriteBatch()
|
ackedTsWriteBatch := gorocksdb.NewWriteBatch()
|
||||||
defer ackedTsWriteBatch.Clear()
|
defer ackedTsWriteBatch.Clear()
|
||||||
if startID == endID {
|
if startID > endID {
|
||||||
|
return nil
|
||||||
|
} else if startID == endID {
|
||||||
ackedTsWriteBatch.Delete([]byte(ackedStartIDKey))
|
ackedTsWriteBatch.Delete([]byte(ackedStartIDKey))
|
||||||
} else {
|
} else {
|
||||||
ackedTsWriteBatch.DeleteRange([]byte(ackedStartIDKey), []byte(ackedEndIDKey))
|
ackedTsWriteBatch.DeleteRange([]byte(ackedStartIDKey), []byte(ackedEndIDKey))
|
||||||
@ -470,7 +467,6 @@ func DeleteMessages(db *gorocksdb.DB, topic string, startID, endID UniqueID) err
|
|||||||
|
|
||||||
writeBatch := gorocksdb.NewWriteBatch()
|
writeBatch := gorocksdb.NewWriteBatch()
|
||||||
defer writeBatch.Clear()
|
defer writeBatch.Clear()
|
||||||
log.Debug("Delete messages by range", zap.Any("topic", topic), zap.Any("startID", startID), zap.Any("endID", endID))
|
|
||||||
if startID == endID {
|
if startID == endID {
|
||||||
writeBatch.Delete([]byte(startKey))
|
writeBatch.Delete([]byte(startKey))
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user