mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
enhance: the datacoord gc should fast quitable (#35050)
issue: #35049 Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
parent
e62f6b2f4e
commit
a495754034
@ -160,7 +160,7 @@ func (gc *garbageCollector) work(ctx context.Context) {
|
||||
gc.recycleChannelCPMeta(ctx)
|
||||
gc.recycleUnusedIndexes(ctx)
|
||||
gc.recycleUnusedSegIndexes(ctx)
|
||||
gc.recycleUnusedAnalyzeFiles()
|
||||
gc.recycleUnusedAnalyzeFiles(ctx)
|
||||
})
|
||||
}()
|
||||
go func() {
|
||||
@ -733,10 +733,8 @@ func (gc *garbageCollector) getAllIndexFilesOfIndex(segmentIndex *model.SegmentI
|
||||
}
|
||||
|
||||
// recycleUnusedAnalyzeFiles is used to delete those analyze stats files that no longer exist in the meta.
|
||||
func (gc *garbageCollector) recycleUnusedAnalyzeFiles() {
|
||||
func (gc *garbageCollector) recycleUnusedAnalyzeFiles(ctx context.Context) {
|
||||
log.Info("start recycleUnusedAnalyzeFiles")
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
startTs := time.Now()
|
||||
prefix := path.Join(gc.option.cli.RootPath(), common.AnalyzeStatsPath) + "/"
|
||||
// list dir first
|
||||
@ -751,6 +749,11 @@ func (gc *garbageCollector) recycleUnusedAnalyzeFiles() {
|
||||
}
|
||||
log.Info("recycleUnusedAnalyzeFiles, finish list object", zap.Duration("time spent", time.Since(startTs)), zap.Int("task ids", len(keys)))
|
||||
for _, key := range keys {
|
||||
if ctx.Err() != nil {
|
||||
// process canceled
|
||||
return
|
||||
}
|
||||
|
||||
log.Debug("analyze keys", zap.String("key", key))
|
||||
taskID, err := parseBuildIDFromFilePath(key)
|
||||
if err != nil {
|
||||
@ -784,6 +787,10 @@ func (gc *garbageCollector) recycleUnusedAnalyzeFiles() {
|
||||
zap.Int64("taskID", taskID), zap.Int64("current version", task.Version))
|
||||
var i int64
|
||||
for i = 0; i < task.Version; i++ {
|
||||
if ctx.Err() != nil {
|
||||
// process canceled.
|
||||
return
|
||||
}
|
||||
removePrefix := prefix + fmt.Sprintf("%d/", task.Version)
|
||||
if err := gc.option.cli.RemoveWithPrefix(ctx, removePrefix); err != nil {
|
||||
log.Warn("garbageCollector recycleUnusedAnalyzeFiles remove files with prefix failed",
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user