diff --git a/internal/indexcoord/garbage_collector.go b/internal/indexcoord/garbage_collector.go index b595e4b9ef..dbdbab179e 100644 --- a/internal/indexcoord/garbage_collector.go +++ b/internal/indexcoord/garbage_collector.go @@ -176,6 +176,8 @@ func (gc *garbageCollector) recycleSegIndexesMeta() { if meta.IsDeleted || gc.metaTable.IsIndexDeleted(meta.CollectionID, meta.IndexID) { if meta.NodeID != 0 { // wait for releasing reference lock + log.Ctx(gc.ctx).Warn("nodeID is not zero, wait for releasing reference lock", + zap.Int64("buildID", meta.BuildID), zap.Int64("segID", meta.SegmentID), zap.Int64("nodeID", meta.NodeID)) continue } if err := gc.metaTable.RemoveSegmentIndex(meta.CollectionID, meta.PartitionID, meta.SegmentID, meta.BuildID); err != nil { diff --git a/internal/storage/minio_chunk_manager.go b/internal/storage/minio_chunk_manager.go index 42295dbc16..8259069b83 100644 --- a/internal/storage/minio_chunk_manager.go +++ b/internal/storage/minio_chunk_manager.go @@ -26,6 +26,8 @@ import ( "strings" "time" + "golang.org/x/sync/errgroup" + "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" "go.uber.org/zap" @@ -365,10 +367,31 @@ func (mcm *MinioChunkManager) MultiRemove(ctx context.Context, keys []string) er // RemoveWithPrefix removes all objects with the same prefix @prefix from minio. func (mcm *MinioChunkManager) RemoveWithPrefix(ctx context.Context, prefix string) error { objects := mcm.listMinioObjects(ctx, mcm.bucketName, minio.ListObjectsOptions{Prefix: prefix, Recursive: true}) - for rErr := range mcm.removeMinioObjects(ctx, mcm.bucketName, objects, minio.RemoveObjectsOptions{GovernanceBypass: false}) { - if rErr.Err != nil { - log.Warn("failed to remove objects", zap.String("prefix", prefix), zap.Error(rErr.Err)) - return rErr.Err + i := 0 + maxGoroutine := 10 + removeKeys := make([]string, 0, len(objects)) + for object := range objects { + if object.Err != nil { + return object.Err + } + removeKeys = append(removeKeys, object.Key) + } + for i < len(removeKeys) { + runningGroup, groupCtx := errgroup.WithContext(ctx) + for j := 0; j < maxGoroutine && i < len(removeKeys); j++ { + key := removeKeys[i] + runningGroup.Go(func() error { + err := mcm.removeMinioObject(groupCtx, mcm.bucketName, key, minio.RemoveObjectOptions{}) + if err != nil { + log.Warn("failed to remove object", zap.String("path", key), zap.Error(err)) + return err + } + return nil + }) + i++ + } + if err := runningGroup.Wait(); err != nil { + return err } } return nil