package storage import ( "fmt" "io" "strconv" "github.com/apache/arrow/go/v17/arrow" "github.com/apache/arrow/go/v17/arrow/array" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/storagev2/packed" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/proto/indexcgopb" "github.com/milvus-io/milvus/pkg/v2/proto/indexpb" "github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) type RecordReader interface { Next() (Record, error) Close() error } type packedRecordReader struct { reader *packed.PackedReader field2Col map[FieldID]int } var _ RecordReader = (*packedRecordReader)(nil) func (pr *packedRecordReader) Next() (Record, error) { rec, err := pr.reader.ReadNext() if err != nil { return nil, err } return NewSimpleArrowRecord(rec, pr.field2Col), nil } func (pr *packedRecordReader) Close() error { if pr.reader != nil { return pr.reader.Close() } return nil } func newPackedRecordReader( paths []string, schema *schemapb.CollectionSchema, bufferSize int64, storageConfig *indexpb.StorageConfig, storagePluginContext *indexcgopb.StoragePluginContext, ) (*packedRecordReader, error) { arrowSchema, err := ConvertToArrowSchema(schema, true) if err != nil { return nil, merr.WrapErrParameterInvalid("convert collection schema [%s] to arrow schema error: %s", schema.Name, err.Error()) } field2Col := make(map[FieldID]int) allFields := typeutil.GetAllFieldSchemas(schema) for i, field := range allFields { field2Col[field.FieldID] = i } reader, err := packed.NewPackedReader(paths, arrowSchema, bufferSize, storageConfig, storagePluginContext) if err != nil { return nil, err } return &packedRecordReader{ reader: reader, field2Col: field2Col, }, nil } func NewRecordReaderFromManifest(manifest string, schema *schemapb.CollectionSchema, bufferSize int64, storageConfig *indexpb.StorageConfig, storagePluginContext *indexcgopb.StoragePluginContext, ) (RecordReader, error) { return NewManifestReader(manifest, schema, bufferSize, storageConfig, storagePluginContext) } var _ RecordReader = (*IterativeRecordReader)(nil) type IterativeRecordReader struct { cur RecordReader iterate func() (RecordReader, error) } // Close implements RecordReader. func (ir *IterativeRecordReader) Close() error { if ir.cur != nil { return ir.cur.Close() } return nil } func (ir *IterativeRecordReader) Next() (Record, error) { if ir.cur == nil { r, err := ir.iterate() if err != nil { return nil, err } ir.cur = r } rec, err := ir.cur.Next() if err == io.EOF { closeErr := ir.cur.Close() if closeErr != nil { return nil, closeErr } ir.cur, err = ir.iterate() if err != nil { return nil, err } rec, err = ir.cur.Next() } return rec, err } func newIterativePackedRecordReader( paths [][]string, schema *schemapb.CollectionSchema, bufferSize int64, storageConfig *indexpb.StorageConfig, storagePluginContext *indexcgopb.StoragePluginContext, ) *IterativeRecordReader { chunk := 0 return &IterativeRecordReader{ iterate: func() (RecordReader, error) { if chunk >= len(paths) { return nil, io.EOF } currentPaths := paths[chunk] chunk++ return newPackedRecordReader(currentPaths, schema, bufferSize, storageConfig, storagePluginContext) }, } } type ManifestReader struct { fieldBinlogs []*datapb.FieldBinlog manifest string reader *packed.FFIPackedReader bufferSize int64 arrowSchema *arrow.Schema schema *schemapb.CollectionSchema schemaHelper *typeutil.SchemaHelper field2Col map[FieldID]int storageConfig *indexpb.StorageConfig storagePluginContext *indexcgopb.StoragePluginContext neededColumns []string } // NewManifestReaderFromBinlogs creates a ManifestReader from binlogs func NewManifestReaderFromBinlogs(fieldBinlogs []*datapb.FieldBinlog, schema *schemapb.CollectionSchema, bufferSize int64, storageConfig *indexpb.StorageConfig, storagePluginContext *indexcgopb.StoragePluginContext, ) (*ManifestReader, error) { arrowSchema, err := ConvertToArrowSchema(schema, false) if err != nil { return nil, merr.WrapErrParameterInvalid("convert collection schema [%s] to arrow schema error: %s", schema.Name, err.Error()) } schemaHelper, err := typeutil.CreateSchemaHelper(schema) if err != nil { return nil, err } field2Col := make(map[FieldID]int) allFields := typeutil.GetAllFieldSchemas(schema) neededColumns := make([]string, 0, len(allFields)) for i, field := range allFields { field2Col[field.FieldID] = i neededColumns = append(neededColumns, field.Name) } prr := &ManifestReader{ fieldBinlogs: fieldBinlogs, bufferSize: bufferSize, arrowSchema: arrowSchema, schema: schema, schemaHelper: schemaHelper, field2Col: field2Col, storageConfig: storageConfig, storagePluginContext: storagePluginContext, neededColumns: neededColumns, } err = prr.init() if err != nil { return nil, err } return prr, nil } func NewManifestReader(manifest string, schema *schemapb.CollectionSchema, bufferSize int64, storageConfig *indexpb.StorageConfig, storagePluginContext *indexcgopb.StoragePluginContext, ) (*ManifestReader, error) { arrowSchema, err := ConvertToArrowSchema(schema, true) if err != nil { return nil, merr.WrapErrParameterInvalid("convert collection schema [%s] to arrow schema error: %s", schema.Name, err.Error()) } schemaHelper, err := typeutil.CreateSchemaHelper(schema) if err != nil { return nil, err } field2Col := make(map[FieldID]int) allFields := typeutil.GetAllFieldSchemas(schema) neededColumns := make([]string, 0, len(allFields)) for i, field := range allFields { field2Col[field.FieldID] = i // Use field id here or external field if field.ExternalField != "" { neededColumns = append(neededColumns, field.ExternalField) } else { neededColumns = append(neededColumns, strconv.FormatInt(field.FieldID, 10)) } } prr := &ManifestReader{ manifest: manifest, bufferSize: bufferSize, arrowSchema: arrowSchema, schema: schema, schemaHelper: schemaHelper, field2Col: field2Col, storageConfig: storageConfig, storagePluginContext: storagePluginContext, neededColumns: neededColumns, } err = prr.init() if err != nil { return nil, err } return prr, nil } func (mr *ManifestReader) init() error { // TODO add needed column option manifest, err := packed.GetManifest(mr.manifest, mr.storageConfig) if err != nil { return err } reader, err := packed.NewFFIPackedReader(manifest, mr.arrowSchema, mr.neededColumns, mr.bufferSize, mr.storageConfig, mr.storagePluginContext) if err != nil { return err } mr.reader = reader return nil } func (mr ManifestReader) Next() (Record, error) { rec, err := mr.reader.ReadNext() if err != nil { return nil, err } return NewSimpleArrowRecord(rec, mr.field2Col), nil } func (mr ManifestReader) Close() error { if mr.reader != nil { return mr.reader.Close() } return nil } // ChunkedBlobsReader returns a chunk composed of blobs, or io.EOF if no more data type ChunkedBlobsReader func() ([]*Blob, error) type CompositeBinlogRecordReader struct { fields map[FieldID]*schemapb.FieldSchema index map[FieldID]int16 brs []*BinlogReader rrs []array.RecordReader } var _ RecordReader = (*CompositeBinlogRecordReader)(nil) func (crr *CompositeBinlogRecordReader) Next() (Record, error) { recs := make([]arrow.Array, len(crr.fields)) nonExistingFields := make([]*schemapb.FieldSchema, 0) nRows := 0 for _, f := range crr.fields { idx := crr.index[f.FieldID] if crr.rrs[idx] != nil { if ok := crr.rrs[idx].Next(); !ok { return nil, io.EOF } r := crr.rrs[idx].Record() recs[idx] = r.Column(0) if nRows == 0 { nRows = int(r.NumRows()) } if nRows != int(r.NumRows()) { return nil, merr.WrapErrServiceInternal(fmt.Sprintf("number of rows mismatch for field %d", f.FieldID)) } } else { nonExistingFields = append(nonExistingFields, f) } } for _, f := range nonExistingFields { // If the field is not in the current batch, fill with null array arr, err := GenerateEmptyArrayFromSchema(f, nRows) if err != nil { return nil, err } recs[crr.index[f.FieldID]] = arr } return &compositeRecord{ index: crr.index, recs: recs, }, nil } func (crr *CompositeBinlogRecordReader) Close() error { if crr.brs != nil { for _, er := range crr.brs { if er != nil { er.Close() } } } if crr.rrs != nil { for _, rr := range crr.rrs { if rr != nil { rr.Release() } } } return nil }