enhance: [StorageV2] Pass bucket name for compaction readers (#42607)

Related to #39173

Like logic in #41919, storage v2 fs shall use complete paths with
bucketName prefix to be compatible with its definition. This PR fills
bucket name from config when creating reader for compaction tasks.

NOTE: the bucket name shall be read from task params config for
compaction task pooling.

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2025-06-10 10:20:35 +08:00 committed by GitHub
parent 118684afbb
commit a9aaa86193
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 46 additions and 8 deletions

View File

@ -587,9 +587,18 @@ func (t *clusteringCompactionTask) mappingSegment(
return merr.WrapErrIllegalCompactionPlan()
}
rr, err := storage.NewBinlogRecordReader(ctx, segment.GetFieldBinlogs(), t.plan.Schema, storage.WithDownloader(func(ctx context.Context, paths []string) ([][]byte, error) {
return t.binlogIO.Download(ctx, paths)
}), storage.WithVersion(segment.StorageVersion), storage.WithBufferSize(t.memoryBufferSize))
// TODO bucketName shall be passed via StorageConfig like index/stats task
bucketName := paramtable.Get().ServiceParam.MinioCfg.BucketName.GetValue()
rr, err := storage.NewBinlogRecordReader(ctx,
segment.GetFieldBinlogs(),
t.plan.Schema,
storage.WithDownloader(func(ctx context.Context, paths []string) ([][]byte, error) {
return t.binlogIO.Download(ctx, paths)
}),
storage.WithVersion(segment.StorageVersion), storage.WithBufferSize(t.memoryBufferSize),
storage.WithBucketName(bucketName),
)
if err != nil {
log.Warn("new binlog record reader wrong", zap.Error(err))
return err
@ -856,9 +865,19 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment(
expiredFilter := compaction.NewEntityFilter(nil, t.plan.GetCollectionTtl(), t.currentTime)
rr, err := storage.NewBinlogRecordReader(ctx, segment.GetFieldBinlogs(), t.plan.Schema, storage.WithDownloader(func(ctx context.Context, paths []string) ([][]byte, error) {
return t.binlogIO.Download(ctx, paths)
}), storage.WithVersion(segment.StorageVersion), storage.WithBufferSize(t.memoryBufferSize))
// TODO bucketName shall be passed via StorageConfig like index/stats task
bucketName := paramtable.Get().ServiceParam.MinioCfg.BucketName.GetValue()
rr, err := storage.NewBinlogRecordReader(ctx,
segment.GetFieldBinlogs(),
t.plan.Schema,
storage.WithDownloader(func(ctx context.Context, paths []string) ([][]byte, error) {
return t.binlogIO.Download(ctx, paths)
}),
storage.WithVersion(segment.StorageVersion),
storage.WithBufferSize(t.memoryBufferSize),
storage.WithBucketName(bucketName),
)
if err != nil {
log.Warn("new binlog record reader wrong", zap.Error(err))
return make(map[interface{}]int64), err

View File

@ -18,6 +18,7 @@ import (
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/metrics"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/timerecord"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
@ -56,10 +57,19 @@ func mergeSortMultipleSegments(ctx context.Context,
return nil, err
}
// TODO bucketName shall be passed via StorageConfig like index/stats task
bucketName := paramtable.Get().ServiceParam.MinioCfg.BucketName.GetValue()
segmentReaders := make([]storage.RecordReader, len(binlogs))
segmentFilters := make([]compaction.EntityFilter, len(binlogs))
for i, s := range binlogs {
reader, err := storage.NewBinlogRecordReader(ctx, s.GetFieldBinlogs(), plan.GetSchema(), storage.WithDownloader(binlogIO.Download), storage.WithVersion(s.StorageVersion))
reader, err := storage.NewBinlogRecordReader(ctx,
s.GetFieldBinlogs(),
plan.GetSchema(),
storage.WithDownloader(binlogIO.Download),
storage.WithVersion(s.StorageVersion),
storage.WithBucketName(bucketName),
)
if err != nil {
return nil, err
}

View File

@ -217,7 +217,16 @@ func (t *mixCompactionTask) writeSegment(ctx context.Context,
}
entityFilter := compaction.NewEntityFilter(delta, t.plan.GetCollectionTtl(), t.currentTime)
reader, err := storage.NewBinlogRecordReader(ctx, seg.GetFieldBinlogs(), t.plan.GetSchema(), storage.WithDownloader(t.binlogIO.Download), storage.WithVersion(seg.GetStorageVersion()))
// TODO bucketName shall be passed via StorageConfig like index/stats task
bucketName := paramtable.Get().ServiceParam.MinioCfg.BucketName.GetValue()
reader, err := storage.NewBinlogRecordReader(ctx,
seg.GetFieldBinlogs(),
t.plan.GetSchema(),
storage.WithDownloader(t.binlogIO.Download),
storage.WithVersion(seg.GetStorageVersion()),
storage.WithBucketName(bucketName),
)
if err != nil {
log.Warn("compact wrong, failed to new insert binlogs reader", zap.Error(err))
return