From a9aaa86193c152e52dfde69a2b59fd994e14d880 Mon Sep 17 00:00:00 2001 From: congqixia Date: Tue, 10 Jun 2025 10:20:35 +0800 Subject: [PATCH] enhance: [StorageV2] Pass bucket name for compaction readers (#42607) Related to #39173 Like logic in #41919, storage v2 fs shall use complete paths with bucketName prefix to be compatible with its definition. This PR fills bucket name from config when creating reader for compaction tasks. NOTE: the bucket name shall be read from task params config for compaction task pooling. Signed-off-by: Congqi Xia --- .../compactor/clustering_compactor.go | 31 +++++++++++++++---- internal/datanode/compactor/merge_sort.go | 12 ++++++- internal/datanode/compactor/mix_compactor.go | 11 ++++++- 3 files changed, 46 insertions(+), 8 deletions(-) diff --git a/internal/datanode/compactor/clustering_compactor.go b/internal/datanode/compactor/clustering_compactor.go index 560d5ecbcf..c01f022563 100644 --- a/internal/datanode/compactor/clustering_compactor.go +++ b/internal/datanode/compactor/clustering_compactor.go @@ -587,9 +587,18 @@ func (t *clusteringCompactionTask) mappingSegment( return merr.WrapErrIllegalCompactionPlan() } - rr, err := storage.NewBinlogRecordReader(ctx, segment.GetFieldBinlogs(), t.plan.Schema, storage.WithDownloader(func(ctx context.Context, paths []string) ([][]byte, error) { - return t.binlogIO.Download(ctx, paths) - }), storage.WithVersion(segment.StorageVersion), storage.WithBufferSize(t.memoryBufferSize)) + // TODO bucketName shall be passed via StorageConfig like index/stats task + bucketName := paramtable.Get().ServiceParam.MinioCfg.BucketName.GetValue() + + rr, err := storage.NewBinlogRecordReader(ctx, + segment.GetFieldBinlogs(), + t.plan.Schema, + storage.WithDownloader(func(ctx context.Context, paths []string) ([][]byte, error) { + return t.binlogIO.Download(ctx, paths) + }), + storage.WithVersion(segment.StorageVersion), storage.WithBufferSize(t.memoryBufferSize), + storage.WithBucketName(bucketName), + ) if err != nil { log.Warn("new binlog record reader wrong", zap.Error(err)) return err @@ -856,9 +865,19 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment( expiredFilter := compaction.NewEntityFilter(nil, t.plan.GetCollectionTtl(), t.currentTime) - rr, err := storage.NewBinlogRecordReader(ctx, segment.GetFieldBinlogs(), t.plan.Schema, storage.WithDownloader(func(ctx context.Context, paths []string) ([][]byte, error) { - return t.binlogIO.Download(ctx, paths) - }), storage.WithVersion(segment.StorageVersion), storage.WithBufferSize(t.memoryBufferSize)) + // TODO bucketName shall be passed via StorageConfig like index/stats task + bucketName := paramtable.Get().ServiceParam.MinioCfg.BucketName.GetValue() + + rr, err := storage.NewBinlogRecordReader(ctx, + segment.GetFieldBinlogs(), + t.plan.Schema, + storage.WithDownloader(func(ctx context.Context, paths []string) ([][]byte, error) { + return t.binlogIO.Download(ctx, paths) + }), + storage.WithVersion(segment.StorageVersion), + storage.WithBufferSize(t.memoryBufferSize), + storage.WithBucketName(bucketName), + ) if err != nil { log.Warn("new binlog record reader wrong", zap.Error(err)) return make(map[interface{}]int64), err diff --git a/internal/datanode/compactor/merge_sort.go b/internal/datanode/compactor/merge_sort.go index 13290a66b4..049911f2ce 100644 --- a/internal/datanode/compactor/merge_sort.go +++ b/internal/datanode/compactor/merge_sort.go @@ -18,6 +18,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/metrics" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" + "github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/timerecord" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) @@ -56,10 +57,19 @@ func mergeSortMultipleSegments(ctx context.Context, return nil, err } + // TODO bucketName shall be passed via StorageConfig like index/stats task + bucketName := paramtable.Get().ServiceParam.MinioCfg.BucketName.GetValue() + segmentReaders := make([]storage.RecordReader, len(binlogs)) segmentFilters := make([]compaction.EntityFilter, len(binlogs)) for i, s := range binlogs { - reader, err := storage.NewBinlogRecordReader(ctx, s.GetFieldBinlogs(), plan.GetSchema(), storage.WithDownloader(binlogIO.Download), storage.WithVersion(s.StorageVersion)) + reader, err := storage.NewBinlogRecordReader(ctx, + s.GetFieldBinlogs(), + plan.GetSchema(), + storage.WithDownloader(binlogIO.Download), + storage.WithVersion(s.StorageVersion), + storage.WithBucketName(bucketName), + ) if err != nil { return nil, err } diff --git a/internal/datanode/compactor/mix_compactor.go b/internal/datanode/compactor/mix_compactor.go index 6534f47c36..310e23ed54 100644 --- a/internal/datanode/compactor/mix_compactor.go +++ b/internal/datanode/compactor/mix_compactor.go @@ -217,7 +217,16 @@ func (t *mixCompactionTask) writeSegment(ctx context.Context, } entityFilter := compaction.NewEntityFilter(delta, t.plan.GetCollectionTtl(), t.currentTime) - reader, err := storage.NewBinlogRecordReader(ctx, seg.GetFieldBinlogs(), t.plan.GetSchema(), storage.WithDownloader(t.binlogIO.Download), storage.WithVersion(seg.GetStorageVersion())) + // TODO bucketName shall be passed via StorageConfig like index/stats task + bucketName := paramtable.Get().ServiceParam.MinioCfg.BucketName.GetValue() + + reader, err := storage.NewBinlogRecordReader(ctx, + seg.GetFieldBinlogs(), + t.plan.GetSchema(), + storage.WithDownloader(t.binlogIO.Download), + storage.WithVersion(seg.GetStorageVersion()), + storage.WithBucketName(bucketName), + ) if err != nil { log.Warn("compact wrong, failed to new insert binlogs reader", zap.Error(err)) return