From 1cf8ed505f21d443d6551bfe926b5332baf6f84d Mon Sep 17 00:00:00 2001 From: congqixia Date: Thu, 24 Jul 2025 00:22:54 +0800 Subject: [PATCH] fix: Implement `NeededFields` feature in `RecordReader` (#43523) Related to #43522 Currently, passing partial schema to storage v2 packed reader may trigger SEGV during clustering compaction unit test. This patch implement `NeededFields` differently in each `RecordReader` imlementation. For now, v2 will implemented as no-op. This will be supported after packed reader support this API. --------- Signed-off-by: Congqi Xia --- .../compactor/clustering_compactor.go | 16 ++++------- internal/storage/rw.go | 28 +++++++++++++++---- internal/storage/serde.go | 1 + internal/storage/serde_events.go | 21 ++++++++++++++ internal/storage/serde_events_v2.go | 5 ++++ 5 files changed, 56 insertions(+), 15 deletions(-) diff --git a/internal/datanode/compactor/clustering_compactor.go b/internal/datanode/compactor/clustering_compactor.go index e9e941dc49..7e20b1f370 100644 --- a/internal/datanode/compactor/clustering_compactor.go +++ b/internal/datanode/compactor/clustering_compactor.go @@ -866,18 +866,13 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment( log.Debug("binlogNum", zap.Int("binlogNum", binlogNum)) expiredFilter := compaction.NewEntityFilter(nil, t.plan.GetCollectionTtl(), t.currentTime) - schema := &schemapb.CollectionSchema{ - Fields: make([]*schemapb.FieldSchema, 0), - } binlogs := make([]*datapb.FieldBinlog, 0) requiredFields := typeutil.NewSet[int64]() requiredFields.Insert(0, 1, t.primaryKeyField.GetFieldID(), t.clusteringKeyField.GetFieldID()) - for _, field := range t.plan.GetSchema().GetFields() { - if requiredFields.Contain(field.GetFieldID()) { - schema.Fields = append(schema.Fields, field) - } - } + selectedFields := lo.Filter(t.plan.GetSchema().GetFields(), func(field *schemapb.FieldSchema, _ int) bool { + return requiredFields.Contain(field.GetFieldID()) + }) switch segment.GetStorageVersion() { case storage.StorageV1: @@ -894,13 +889,14 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment( } rr, err := storage.NewBinlogRecordReader(ctx, binlogs, - schema, + t.plan.GetSchema(), storage.WithDownloader(func(ctx context.Context, paths []string) ([][]byte, error) { return t.binlogIO.Download(ctx, paths) }), storage.WithVersion(segment.StorageVersion), storage.WithBufferSize(t.bufferSize), storage.WithStorageConfig(t.compactionParams.StorageConfig), + storage.WithNeededFields(requiredFields), ) if err != nil { log.Warn("new binlog record reader wrong", zap.Error(err)) @@ -908,7 +904,7 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment( } pkIter := storage.NewDeserializeReader(rr, func(r storage.Record, v []*storage.Value) error { - return storage.ValueDeserializer(r, v, schema.Fields) + return storage.ValueDeserializer(r, v, selectedFields) }) defer pkIter.Close() analyzeResult, remained, err := t.iterAndGetScalarAnalyzeResult(pkIter, expiredFilter) diff --git a/internal/storage/rw.go b/internal/storage/rw.go index 28c1104d2d..269936ccb0 100644 --- a/internal/storage/rw.go +++ b/internal/storage/rw.go @@ -32,6 +32,7 @@ import ( "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/proto/indexpb" "github.com/milvus-io/milvus/pkg/v2/util/merr" + "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) const ( @@ -61,6 +62,7 @@ type rwOptions struct { multiPartUploadSize int64 columnGroups []storagecommon.ColumnGroup storageConfig *indexpb.StorageConfig + neededFields typeutil.Set[int64] } func (o *rwOptions) validate() error { @@ -141,6 +143,12 @@ func WithStorageConfig(storageConfig *indexpb.StorageConfig) RwOption { } } +func WithNeededFields(neededFields typeutil.Set[int64]) RwOption { + return func(options *rwOptions) { + options.neededFields = neededFields + } +} + func makeBlobsReader(ctx context.Context, binlogs []*datapb.FieldBinlog, downloader downloaderFn) (ChunkedBlobsReader, error) { if len(binlogs) == 0 { return func() ([]*Blob, error) { @@ -212,7 +220,7 @@ func makeBlobsReader(ctx context.Context, binlogs []*datapb.FieldBinlog, downloa }, nil } -func NewBinlogRecordReader(ctx context.Context, binlogs []*datapb.FieldBinlog, schema *schemapb.CollectionSchema, option ...RwOption) (RecordReader, error) { +func NewBinlogRecordReader(ctx context.Context, binlogs []*datapb.FieldBinlog, schema *schemapb.CollectionSchema, option ...RwOption) (rr RecordReader, err error) { rwOptions := DefaultReaderOptions() for _, opt := range option { opt(rwOptions) @@ -222,11 +230,13 @@ func NewBinlogRecordReader(ctx context.Context, binlogs []*datapb.FieldBinlog, s } switch rwOptions.version { case StorageV1: - blobsReader, err := makeBlobsReader(ctx, binlogs, rwOptions.downloader) + var blobsReader ChunkedBlobsReader + blobsReader, err = makeBlobsReader(ctx, binlogs, rwOptions.downloader) if err != nil { return nil, err } - return newCompositeBinlogRecordReader(schema, blobsReader) + + rr, err = newCompositeBinlogRecordReader(schema, blobsReader) case StorageV2: if len(binlogs) <= 0 { return nil, sio.EOF @@ -245,9 +255,17 @@ func NewBinlogRecordReader(ctx context.Context, binlogs []*datapb.FieldBinlog, s paths[j] = append(paths[j], logPath) } } - return newPackedRecordReader(paths, schema, rwOptions.bufferSize, rwOptions.storageConfig) + rr, err = newPackedRecordReader(paths, schema, rwOptions.bufferSize, rwOptions.storageConfig) + default: + return nil, merr.WrapErrServiceInternal(fmt.Sprintf("unsupported storage version %d", rwOptions.version)) } - return nil, merr.WrapErrServiceInternal(fmt.Sprintf("unsupported storage version %d", rwOptions.version)) + if err != nil { + return nil, err + } + if rwOptions.neededFields != nil { + rr.SetNeededFields(rwOptions.neededFields) + } + return rr, nil } func NewBinlogRecordWriter(ctx context.Context, collectionID, partitionID, segmentID UniqueID, diff --git a/internal/storage/serde.go b/internal/storage/serde.go index 23a2ed0a10..9c156f49a0 100644 --- a/internal/storage/serde.go +++ b/internal/storage/serde.go @@ -44,6 +44,7 @@ type Record interface { type RecordReader interface { Next() (Record, error) + SetNeededFields(fields typeutil.Set[int64]) Close() error } diff --git a/internal/storage/serde_events.go b/internal/storage/serde_events.go index ed5b5c1f3d..a409a4e81d 100644 --- a/internal/storage/serde_events.go +++ b/internal/storage/serde_events.go @@ -151,6 +151,23 @@ func (crr *CompositeBinlogRecordReader) Next() (Record, error) { return r, nil } +func (crr *CompositeBinlogRecordReader) SetNeededFields(neededFields typeutil.Set[int64]) { + if neededFields == nil { + return + } + + crr.schema = &schemapb.CollectionSchema{ + Fields: lo.Filter(crr.schema.GetFields(), func(field *schemapb.FieldSchema, _ int) bool { + return neededFields.Contain(field.GetFieldID()) + }), + } + index := make(map[FieldID]int16) + for i, f := range crr.schema.Fields { + index[f.FieldID] = int16(i) + } + crr.index = index +} + func (crr *CompositeBinlogRecordReader) Close() error { if crr.brs != nil { for _, er := range crr.brs { @@ -1042,6 +1059,10 @@ func (crr *simpleArrowRecordReader) Next() (Record, error) { return &crr.r, nil } +func (crr *simpleArrowRecordReader) SetNeededFields(_ typeutil.Set[int64]) { + // no-op for simple arrow record reader +} + func (crr *simpleArrowRecordReader) Close() error { if crr.closer != nil { crr.closer() diff --git a/internal/storage/serde_events_v2.go b/internal/storage/serde_events_v2.go index 735cb91485..42f48648e1 100644 --- a/internal/storage/serde_events_v2.go +++ b/internal/storage/serde_events_v2.go @@ -95,6 +95,11 @@ func (pr *packedRecordReader) Next() (Record, error) { } } +func (pr *packedRecordReader) SetNeededFields(fields typeutil.Set[int64]) { + // TODO, push down SetNeededFields to packedReader after implemented + // no-op for now +} + func (pr *packedRecordReader) Close() error { if pr.reader != nil { return pr.reader.Close()