diff --git a/internal/datacoord/garbage_collector.go b/internal/datacoord/garbage_collector.go index 4ea6accc14..2d216c5905 100644 --- a/internal/datacoord/garbage_collector.go +++ b/internal/datacoord/garbage_collector.go @@ -160,7 +160,7 @@ func (gc *garbageCollector) work(ctx context.Context) { gc.recycleChannelCPMeta(ctx) gc.recycleUnusedIndexes(ctx) gc.recycleUnusedSegIndexes(ctx) - gc.recycleUnusedAnalyzeFiles() + gc.recycleUnusedAnalyzeFiles(ctx) }) }() go func() { @@ -733,10 +733,8 @@ func (gc *garbageCollector) getAllIndexFilesOfIndex(segmentIndex *model.SegmentI } // recycleUnusedAnalyzeFiles is used to delete those analyze stats files that no longer exist in the meta. -func (gc *garbageCollector) recycleUnusedAnalyzeFiles() { +func (gc *garbageCollector) recycleUnusedAnalyzeFiles(ctx context.Context) { log.Info("start recycleUnusedAnalyzeFiles") - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() startTs := time.Now() prefix := path.Join(gc.option.cli.RootPath(), common.AnalyzeStatsPath) + "/" // list dir first @@ -751,6 +749,11 @@ func (gc *garbageCollector) recycleUnusedAnalyzeFiles() { } log.Info("recycleUnusedAnalyzeFiles, finish list object", zap.Duration("time spent", time.Since(startTs)), zap.Int("task ids", len(keys))) for _, key := range keys { + if ctx.Err() != nil { + // process canceled + return + } + log.Debug("analyze keys", zap.String("key", key)) taskID, err := parseBuildIDFromFilePath(key) if err != nil { @@ -784,6 +787,10 @@ func (gc *garbageCollector) recycleUnusedAnalyzeFiles() { zap.Int64("taskID", taskID), zap.Int64("current version", task.Version)) var i int64 for i = 0; i < task.Version; i++ { + if ctx.Err() != nil { + // process canceled. + return + } removePrefix := prefix + fmt.Sprintf("%d/", task.Version) if err := gc.option.cli.RemoveWithPrefix(ctx, removePrefix); err != nil { log.Warn("garbageCollector recycleUnusedAnalyzeFiles remove files with prefix failed",