diff --git a/internal/datanode/compactor/clustering_compactor.go b/internal/datanode/compactor/clustering_compactor.go index 560d5ecbcf..c01f022563 100644 --- a/internal/datanode/compactor/clustering_compactor.go +++ b/internal/datanode/compactor/clustering_compactor.go @@ -587,9 +587,18 @@ func (t *clusteringCompactionTask) mappingSegment( return merr.WrapErrIllegalCompactionPlan() } - rr, err := storage.NewBinlogRecordReader(ctx, segment.GetFieldBinlogs(), t.plan.Schema, storage.WithDownloader(func(ctx context.Context, paths []string) ([][]byte, error) { - return t.binlogIO.Download(ctx, paths) - }), storage.WithVersion(segment.StorageVersion), storage.WithBufferSize(t.memoryBufferSize)) + // TODO bucketName shall be passed via StorageConfig like index/stats task + bucketName := paramtable.Get().ServiceParam.MinioCfg.BucketName.GetValue() + + rr, err := storage.NewBinlogRecordReader(ctx, + segment.GetFieldBinlogs(), + t.plan.Schema, + storage.WithDownloader(func(ctx context.Context, paths []string) ([][]byte, error) { + return t.binlogIO.Download(ctx, paths) + }), + storage.WithVersion(segment.StorageVersion), storage.WithBufferSize(t.memoryBufferSize), + storage.WithBucketName(bucketName), + ) if err != nil { log.Warn("new binlog record reader wrong", zap.Error(err)) return err @@ -856,9 +865,19 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment( expiredFilter := compaction.NewEntityFilter(nil, t.plan.GetCollectionTtl(), t.currentTime) - rr, err := storage.NewBinlogRecordReader(ctx, segment.GetFieldBinlogs(), t.plan.Schema, storage.WithDownloader(func(ctx context.Context, paths []string) ([][]byte, error) { - return t.binlogIO.Download(ctx, paths) - }), storage.WithVersion(segment.StorageVersion), storage.WithBufferSize(t.memoryBufferSize)) + // TODO bucketName shall be passed via StorageConfig like index/stats task + bucketName := paramtable.Get().ServiceParam.MinioCfg.BucketName.GetValue() + + rr, err := storage.NewBinlogRecordReader(ctx, + segment.GetFieldBinlogs(), + t.plan.Schema, + storage.WithDownloader(func(ctx context.Context, paths []string) ([][]byte, error) { + return t.binlogIO.Download(ctx, paths) + }), + storage.WithVersion(segment.StorageVersion), + storage.WithBufferSize(t.memoryBufferSize), + storage.WithBucketName(bucketName), + ) if err != nil { log.Warn("new binlog record reader wrong", zap.Error(err)) return make(map[interface{}]int64), err diff --git a/internal/datanode/compactor/merge_sort.go b/internal/datanode/compactor/merge_sort.go index 13290a66b4..049911f2ce 100644 --- a/internal/datanode/compactor/merge_sort.go +++ b/internal/datanode/compactor/merge_sort.go @@ -18,6 +18,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/metrics" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" + "github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/timerecord" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) @@ -56,10 +57,19 @@ func mergeSortMultipleSegments(ctx context.Context, return nil, err } + // TODO bucketName shall be passed via StorageConfig like index/stats task + bucketName := paramtable.Get().ServiceParam.MinioCfg.BucketName.GetValue() + segmentReaders := make([]storage.RecordReader, len(binlogs)) segmentFilters := make([]compaction.EntityFilter, len(binlogs)) for i, s := range binlogs { - reader, err := storage.NewBinlogRecordReader(ctx, s.GetFieldBinlogs(), plan.GetSchema(), storage.WithDownloader(binlogIO.Download), storage.WithVersion(s.StorageVersion)) + reader, err := storage.NewBinlogRecordReader(ctx, + s.GetFieldBinlogs(), + plan.GetSchema(), + storage.WithDownloader(binlogIO.Download), + storage.WithVersion(s.StorageVersion), + storage.WithBucketName(bucketName), + ) if err != nil { return nil, err } diff --git a/internal/datanode/compactor/mix_compactor.go b/internal/datanode/compactor/mix_compactor.go index 6534f47c36..310e23ed54 100644 --- a/internal/datanode/compactor/mix_compactor.go +++ b/internal/datanode/compactor/mix_compactor.go @@ -217,7 +217,16 @@ func (t *mixCompactionTask) writeSegment(ctx context.Context, } entityFilter := compaction.NewEntityFilter(delta, t.plan.GetCollectionTtl(), t.currentTime) - reader, err := storage.NewBinlogRecordReader(ctx, seg.GetFieldBinlogs(), t.plan.GetSchema(), storage.WithDownloader(t.binlogIO.Download), storage.WithVersion(seg.GetStorageVersion())) + // TODO bucketName shall be passed via StorageConfig like index/stats task + bucketName := paramtable.Get().ServiceParam.MinioCfg.BucketName.GetValue() + + reader, err := storage.NewBinlogRecordReader(ctx, + seg.GetFieldBinlogs(), + t.plan.GetSchema(), + storage.WithDownloader(t.binlogIO.Download), + storage.WithVersion(seg.GetStorageVersion()), + storage.WithBucketName(bucketName), + ) if err != nil { log.Warn("compact wrong, failed to new insert binlogs reader", zap.Error(err)) return