mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-02-02 01:06:41 +08:00
Handle multiple parquet RowGroup/Column (#19309)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
3c3ba55cd7
commit
268b0d989e
@ -80,18 +80,26 @@ func (r *PayloadReader) GetBoolFromPayload() ([]bool, error) {
|
||||
if r.colType != schemapb.DataType_Bool {
|
||||
return nil, fmt.Errorf("failed to get bool from datatype %v", r.colType.String())
|
||||
}
|
||||
reader, ok := r.reader.RowGroup(0).Column(0).(*file.BooleanColumnChunkReader)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("expect type *file.BooleanColumnChunkReader, but got %T", r.reader.RowGroup(0).Column(0))
|
||||
|
||||
var offset int
|
||||
values := make([]bool, r.numRows)
|
||||
for i := 0; i < r.reader.NumRowGroups(); i++ {
|
||||
rg := r.reader.RowGroup(i)
|
||||
// hard-code 0-th column
|
||||
reader, ok := rg.Column(0).(*file.BooleanColumnChunkReader)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("expect type *file.BooleanColumnChunkReader, but got %T", rg.Column(0))
|
||||
}
|
||||
|
||||
_, valuesRead, err := reader.ReadBatch(r.numRows, values[offset:], nil, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
offset += valuesRead
|
||||
}
|
||||
|
||||
values := make([]bool, r.numRows)
|
||||
total, valuesRead, err := reader.ReadBatch(r.numRows, values, nil, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if total != r.numRows || int64(valuesRead) != r.numRows {
|
||||
return nil, fmt.Errorf("expect %d rows, but got total = %d and valuesRead = %d", r.numRows, total, valuesRead)
|
||||
if int64(offset) != r.numRows {
|
||||
return nil, fmt.Errorf("expect %d rows, but got valuesRead = %d", r.numRows, offset)
|
||||
}
|
||||
return values, nil
|
||||
}
|
||||
@ -101,18 +109,25 @@ func (r *PayloadReader) GetByteFromPayload() ([]byte, error) {
|
||||
if r.colType != schemapb.DataType_Int8 {
|
||||
return nil, fmt.Errorf("failed to get byte from datatype %v", r.colType.String())
|
||||
}
|
||||
reader, ok := r.reader.RowGroup(0).Column(0).(*file.Int32ColumnChunkReader)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("expect type *file.Int32ColumnChunkReader, but got %T", r.reader.RowGroup(0).Column(0))
|
||||
|
||||
var offset int
|
||||
values := make([]int32, r.numRows)
|
||||
for i := 0; i < r.reader.NumRowGroups(); i++ {
|
||||
rg := r.reader.RowGroup(i)
|
||||
reader, ok := rg.Column(0).(*file.Int32ColumnChunkReader)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("expect type *file.Int32ColumnChunkReader, but got %T", rg.Column(0))
|
||||
}
|
||||
|
||||
_, valuesRead, err := reader.ReadBatch(r.numRows, values[offset:], nil, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
offset += valuesRead
|
||||
}
|
||||
|
||||
values := make([]int32, r.numRows)
|
||||
total, valuesRead, err := reader.ReadBatch(r.numRows, values, nil, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if total != r.numRows || int64(valuesRead) != r.numRows {
|
||||
return nil, fmt.Errorf("expect %d rows, but got total = %d and valuesRead = %d", r.numRows, total, valuesRead)
|
||||
if int64(offset) != r.numRows {
|
||||
return nil, fmt.Errorf("expect %d rows, but got valuesRead = %d", r.numRows, offset)
|
||||
}
|
||||
|
||||
ret := make([]byte, r.numRows)
|
||||
@ -128,17 +143,24 @@ func (r *PayloadReader) GetInt8FromPayload() ([]int8, error) {
|
||||
return nil, fmt.Errorf("failed to get int8 from datatype %v", r.colType.String())
|
||||
}
|
||||
|
||||
reader, ok := r.reader.RowGroup(0).Column(0).(*file.Int32ColumnChunkReader)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("expect type *file.Int32ColumnChunkReader, but got %T", r.reader.RowGroup(0).Column(0))
|
||||
}
|
||||
var offset int
|
||||
values := make([]int32, r.numRows)
|
||||
total, valuesRead, err := reader.ReadBatch(r.numRows, values, nil, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
for i := 0; i < r.reader.NumRowGroups(); i++ {
|
||||
rg := r.reader.RowGroup(i)
|
||||
reader, ok := rg.Column(0).(*file.Int32ColumnChunkReader)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("expect type *file.Int32ColumnChunkReader, but got %T", rg.Column(0))
|
||||
}
|
||||
|
||||
_, valuesRead, err := reader.ReadBatch(r.numRows, values[offset:], nil, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
offset += valuesRead
|
||||
}
|
||||
if total != r.numRows || int64(valuesRead) != r.numRows {
|
||||
return nil, fmt.Errorf("expect %d rows, but got total = %d and valuesRead = %d", r.numRows, total, valuesRead)
|
||||
|
||||
if int64(offset) != r.numRows {
|
||||
return nil, fmt.Errorf("expect %d rows, but got valuesRead = %d", r.numRows, offset)
|
||||
}
|
||||
|
||||
ret := make([]int8, r.numRows)
|
||||
@ -153,17 +175,23 @@ func (r *PayloadReader) GetInt16FromPayload() ([]int16, error) {
|
||||
return nil, fmt.Errorf("failed to get int16 from datatype %v", r.colType.String())
|
||||
}
|
||||
|
||||
reader, ok := r.reader.RowGroup(0).Column(0).(*file.Int32ColumnChunkReader)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("expect type *file.Int32ColumnChunkReader, but got %T", r.reader.RowGroup(0).Column(0))
|
||||
}
|
||||
var offset int
|
||||
values := make([]int32, r.numRows)
|
||||
total, valuesRead, err := reader.ReadBatch(r.numRows, values, nil, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
for i := 0; i < r.reader.NumRowGroups(); i++ {
|
||||
rg := r.reader.RowGroup(i)
|
||||
reader, ok := rg.Column(0).(*file.Int32ColumnChunkReader)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("expect type *file.Int32ColumnChunkReader, but got %T", rg.Column(0))
|
||||
}
|
||||
|
||||
_, valuesRead, err := reader.ReadBatch(r.numRows, values[offset:], nil, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
offset += valuesRead
|
||||
}
|
||||
if total != r.numRows || int64(valuesRead) != r.numRows {
|
||||
return nil, fmt.Errorf("expect %d rows, but got total = %d and valuesRead = %d", r.numRows, total, valuesRead)
|
||||
if int64(offset) != r.numRows {
|
||||
return nil, fmt.Errorf("expect %d rows, but got valuesRead = %d", r.numRows, offset)
|
||||
}
|
||||
|
||||
ret := make([]int16, r.numRows)
|
||||
@ -178,19 +206,24 @@ func (r *PayloadReader) GetInt32FromPayload() ([]int32, error) {
|
||||
return nil, fmt.Errorf("failed to get int32 from datatype %v", r.colType.String())
|
||||
}
|
||||
|
||||
reader, ok := r.reader.RowGroup(0).Column(0).(*file.Int32ColumnChunkReader)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("expect type *file.Int32ColumnChunkReader, but got %T", r.reader.RowGroup(0).Column(0))
|
||||
}
|
||||
var offset int
|
||||
values := make([]int32, r.numRows)
|
||||
total, valuesRead, err := reader.ReadBatch(r.numRows, values, nil, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if total != r.numRows || int64(valuesRead) != r.numRows {
|
||||
return nil, fmt.Errorf("expect %d rows, but got total = %d and valuesRead = %d", r.numRows, total, valuesRead)
|
||||
}
|
||||
for i := 0; i < r.reader.NumRowGroups(); i++ {
|
||||
rg := r.reader.RowGroup(i)
|
||||
reader, ok := rg.Column(0).(*file.Int32ColumnChunkReader)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("expect type *file.Int32ColumnChunkReader, but got %T", rg.Column(0))
|
||||
}
|
||||
|
||||
_, valuesRead, err := reader.ReadBatch(r.numRows, values[offset:], nil, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
offset += valuesRead
|
||||
}
|
||||
if int64(offset) != r.numRows {
|
||||
return nil, fmt.Errorf("expect %d rows, but got valuesRead = %d", r.numRows, offset)
|
||||
}
|
||||
return values, nil
|
||||
}
|
||||
|
||||
@ -198,19 +231,26 @@ func (r *PayloadReader) GetInt64FromPayload() ([]int64, error) {
|
||||
if r.colType != schemapb.DataType_Int64 {
|
||||
return nil, fmt.Errorf("failed to get int64 from datatype %v", r.colType.String())
|
||||
}
|
||||
reader, ok := r.reader.RowGroup(0).Column(0).(*file.Int64ColumnChunkReader)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("expect type *file.Int64ColumnChunkReader, but got %T", r.reader.RowGroup(0).Column(0))
|
||||
|
||||
var offset int
|
||||
values := make([]int64, r.numRows)
|
||||
for i := 0; i < r.reader.NumRowGroups(); i++ {
|
||||
rg := r.reader.RowGroup(i)
|
||||
reader, ok := rg.Column(0).(*file.Int64ColumnChunkReader)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("expect type *file.Int64ColumnChunkReader, but got %T", rg.Column(0))
|
||||
}
|
||||
|
||||
_, valuesRead, err := reader.ReadBatch(r.numRows, values[offset:], nil, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
offset += valuesRead
|
||||
}
|
||||
if int64(offset) != r.numRows {
|
||||
return nil, fmt.Errorf("expect %d rows, but got valuesRead = %d", r.numRows, offset)
|
||||
}
|
||||
|
||||
values := make([]int64, r.numRows)
|
||||
total, valuesRead, err := reader.ReadBatch(r.numRows, values, nil, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if total != r.numRows || int64(valuesRead) != r.numRows {
|
||||
return nil, fmt.Errorf("expect %d rows, but got total = %d and valuesRead = %d", r.numRows, total, valuesRead)
|
||||
}
|
||||
return values, nil
|
||||
}
|
||||
|
||||
@ -218,18 +258,24 @@ func (r *PayloadReader) GetFloatFromPayload() ([]float32, error) {
|
||||
if r.colType != schemapb.DataType_Float {
|
||||
return nil, fmt.Errorf("failed to get float32 from datatype %v", r.colType.String())
|
||||
}
|
||||
reader, ok := r.reader.RowGroup(0).Column(0).(*file.Float32ColumnChunkReader)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("expect type *file.Float32ColumnChunkReader, but got %T", r.reader.RowGroup(0).Column(0))
|
||||
}
|
||||
|
||||
var offset int
|
||||
values := make([]float32, r.numRows)
|
||||
total, valuesRead, err := reader.ReadBatch(r.numRows, values, nil, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
for i := 0; i < r.reader.NumRowGroups(); i++ {
|
||||
rg := r.reader.RowGroup(i)
|
||||
reader, ok := rg.Column(0).(*file.Float32ColumnChunkReader)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("expect type *file.Float32ColumnChunkReader, but got %T", rg.Column(0))
|
||||
}
|
||||
|
||||
_, valuesRead, err := reader.ReadBatch(r.numRows, values[offset:], nil, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
offset += valuesRead
|
||||
}
|
||||
if total != r.numRows || int64(valuesRead) != r.numRows {
|
||||
return nil, fmt.Errorf("expect %d rows, but got total = %d and valuesRead = %d", r.numRows, total, valuesRead)
|
||||
if int64(offset) != r.numRows {
|
||||
return nil, fmt.Errorf("expect %d rows, but got valuesRead = %d", r.numRows, offset)
|
||||
}
|
||||
return values, nil
|
||||
}
|
||||
@ -238,18 +284,24 @@ func (r *PayloadReader) GetDoubleFromPayload() ([]float64, error) {
|
||||
if r.colType != schemapb.DataType_Double {
|
||||
return nil, fmt.Errorf("failed to get float32 from datatype %v", r.colType.String())
|
||||
}
|
||||
reader, ok := r.reader.RowGroup(0).Column(0).(*file.Float64ColumnChunkReader)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("expect type *file.Float64ColumnChunkReader, but got %T", r.reader.RowGroup(0).Column(0))
|
||||
}
|
||||
|
||||
var offset int
|
||||
values := make([]float64, r.numRows)
|
||||
total, valuesRead, err := reader.ReadBatch(r.numRows, values, nil, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
for i := 0; i < r.reader.NumRowGroups(); i++ {
|
||||
rg := r.reader.RowGroup(i)
|
||||
reader, ok := rg.Column(0).(*file.Float64ColumnChunkReader)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("expect type *file.Float64ColumnChunkReader, but got %T", rg.Column(0))
|
||||
}
|
||||
|
||||
_, valuesRead, err := reader.ReadBatch(r.numRows, values[offset:], nil, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
offset += valuesRead
|
||||
}
|
||||
if total != r.numRows || int64(valuesRead) != r.numRows {
|
||||
return nil, fmt.Errorf("expect %d rows, but got total = %d and valuesRead = %d", r.numRows, total, valuesRead)
|
||||
if int64(offset) != r.numRows {
|
||||
return nil, fmt.Errorf("expect %d rows, but got valuesRead = %d", r.numRows, offset)
|
||||
}
|
||||
return values, nil
|
||||
}
|
||||
@ -259,17 +311,23 @@ func (r *PayloadReader) GetStringFromPayload() ([]string, error) {
|
||||
return nil, fmt.Errorf("failed to get string from datatype %v", r.colType.String())
|
||||
}
|
||||
|
||||
reader, ok := r.reader.RowGroup(0).Column(0).(*file.ByteArrayColumnChunkReader)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("expect type *file.ByteArrayColumnChunkReader, but got %T", r.reader.RowGroup(0).Column(0))
|
||||
}
|
||||
var offset int
|
||||
values := make([]parquet.ByteArray, r.numRows)
|
||||
total, valuesRead, err := reader.ReadBatch(r.numRows, values, nil, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
for i := 0; i < r.reader.NumRowGroups(); i++ {
|
||||
rg := r.reader.RowGroup(i)
|
||||
reader, ok := rg.Column(0).(*file.ByteArrayColumnChunkReader)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("expect type *file.ByteArrayColumnChunkReader, but got %T", rg.Column(0))
|
||||
}
|
||||
|
||||
_, valuesRead, err := reader.ReadBatch(r.numRows, values[offset:], nil, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
offset += valuesRead
|
||||
}
|
||||
if total != r.numRows || int64(valuesRead) != r.numRows {
|
||||
return nil, fmt.Errorf("expect %d rows, but got total = %d and valuesRead = %d", r.numRows, total, valuesRead)
|
||||
if int64(offset) != r.numRows {
|
||||
return nil, fmt.Errorf("expect %d rows, but got valuesRead = %d", r.numRows, offset)
|
||||
}
|
||||
|
||||
ret := make([]string, r.numRows)
|
||||
@ -285,20 +343,26 @@ func (r *PayloadReader) GetBinaryVectorFromPayload() ([]byte, int, error) {
|
||||
return nil, -1, fmt.Errorf("failed to get binary vector from datatype %v", r.colType.String())
|
||||
}
|
||||
|
||||
reader, ok := r.reader.RowGroup(0).Column(0).(*file.FixedLenByteArrayColumnChunkReader)
|
||||
if !ok {
|
||||
return nil, -1, fmt.Errorf("expect type *file.FixedLenByteArrayColumnChunkReader, but got %T", r.reader.RowGroup(0).Column(0))
|
||||
dim := r.reader.RowGroup(0).Column(0).Descriptor().TypeLength()
|
||||
var offset int
|
||||
values := make([]parquet.FixedLenByteArray, r.numRows)
|
||||
for i := 0; i < r.reader.NumRowGroups(); i++ {
|
||||
rg := r.reader.RowGroup(i)
|
||||
reader, ok := rg.Column(0).(*file.FixedLenByteArrayColumnChunkReader)
|
||||
if !ok {
|
||||
return nil, -1, fmt.Errorf("expect type *file.FixedLenByteArrayColumnChunkReader, but got %T", rg.Column(0))
|
||||
}
|
||||
|
||||
_, valuesRead, err := reader.ReadBatch(r.numRows, values[offset:], nil, nil)
|
||||
if err != nil {
|
||||
return nil, -1, err
|
||||
}
|
||||
offset += valuesRead
|
||||
}
|
||||
if int64(offset) != r.numRows {
|
||||
return nil, 0, fmt.Errorf("expect %d rows, but got valuesRead = %d", r.numRows, offset)
|
||||
}
|
||||
|
||||
dim := r.reader.RowGroup(0).Column(0).Descriptor().TypeLength()
|
||||
values := make([]parquet.FixedLenByteArray, r.numRows)
|
||||
total, valuesRead, err := reader.ReadBatch(r.numRows, values, nil, nil)
|
||||
if err != nil {
|
||||
return nil, -1, err
|
||||
}
|
||||
if total != r.numRows || int64(valuesRead) != r.numRows {
|
||||
return nil, -1, fmt.Errorf("expect %d rows, but got total = %d and valuesRead = %d", r.numRows, total, valuesRead)
|
||||
}
|
||||
ret := make([]byte, int64(dim)*r.numRows)
|
||||
for i := 0; i < int(r.numRows); i++ {
|
||||
copy(ret[i*dim:(i+1)*dim], values[i])
|
||||
@ -311,19 +375,24 @@ func (r *PayloadReader) GetFloatVectorFromPayload() ([]float32, int, error) {
|
||||
if r.colType != schemapb.DataType_FloatVector {
|
||||
return nil, -1, fmt.Errorf("failed to get float vector from datatype %v", r.colType.String())
|
||||
}
|
||||
reader, ok := r.reader.RowGroup(0).Column(0).(*file.FixedLenByteArrayColumnChunkReader)
|
||||
if !ok {
|
||||
return nil, -1, fmt.Errorf("expect type *file.FixedLenByteArrayColumnChunkReader, but got %T", r.reader.RowGroup(0).Column(0))
|
||||
}
|
||||
|
||||
dim := r.reader.RowGroup(0).Column(0).Descriptor().TypeLength() / 4
|
||||
var offset int
|
||||
values := make([]parquet.FixedLenByteArray, r.numRows)
|
||||
total, valuesRead, err := reader.ReadBatch(r.numRows, values, nil, nil)
|
||||
if err != nil {
|
||||
return nil, -1, err
|
||||
for i := 0; i < r.reader.NumRowGroups(); i++ {
|
||||
rg := r.reader.RowGroup(i)
|
||||
reader, ok := rg.Column(0).(*file.FixedLenByteArrayColumnChunkReader)
|
||||
if !ok {
|
||||
return nil, -1, fmt.Errorf("expect type *file.FixedLenByteArrayColumnChunkReader, but got %T", rg.Column(0))
|
||||
}
|
||||
|
||||
_, valuesRead, err := reader.ReadBatch(r.numRows, values[offset:], nil, nil)
|
||||
if err != nil {
|
||||
return nil, -1, err
|
||||
}
|
||||
offset += valuesRead
|
||||
}
|
||||
if total != r.numRows || int64(valuesRead) != r.numRows {
|
||||
return nil, -1, fmt.Errorf("expect %d rows, but got total = %d and valuesRead = %d", r.numRows, total, valuesRead)
|
||||
if int64(offset) != r.numRows {
|
||||
return nil, -1, fmt.Errorf("expect %d rows, but got valuesRead = %d", r.numRows, offset)
|
||||
}
|
||||
ret := make([]float32, int64(dim)*r.numRows)
|
||||
for i := 0; i < int(r.numRows); i++ {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user