From 279cf6f5e56ff6eb75ecfb4fffa48bd4e584f3c0 Mon Sep 17 00:00:00 2001 From: Xiaofan <83447078+xiaofan-luan@users.noreply.github.com> Date: Wed, 16 Nov 2022 19:47:08 +0800 Subject: [PATCH] Revert Index Codec (#20652) Signed-off-by: xiaofan-luan Signed-off-by: xiaofan-luan --- internal/storage/payload_reader.go | 125 +++++++++++------------------ 1 file changed, 47 insertions(+), 78 deletions(-) diff --git a/internal/storage/payload_reader.go b/internal/storage/payload_reader.go index d16b8fc219..bc0d9a6181 100644 --- a/internal/storage/payload_reader.go +++ b/internal/storage/payload_reader.go @@ -100,26 +100,19 @@ func (r *PayloadReader) GetByteFromPayload() ([]byte, error) { return nil, fmt.Errorf("failed to get byte from datatype %v", r.colType.String()) } - ret := make([]byte, r.numRows) - values := make([]int32, 4096) - offset := int64(0) - for offset < r.numRows { - batch := int64(4096) - if r.numRows-offset < 4096 { - batch = r.numRows - offset - } - valuesRead, err := ReadDataFromAllRowGroups[int32, *file.Int32ColumnChunkReader](r.reader, values, offset, batch) - if err != nil { - return nil, err - } - if valuesRead != batch { - return nil, fmt.Errorf("expect %d rows, but got valuesRead = %d", r.numRows, valuesRead) - } + values := make([]int32, r.numRows) + valuesRead, err := ReadDataFromAllRowGroups[int32, *file.Int32ColumnChunkReader](r.reader, values, 0, r.numRows) + if err != nil { + return nil, err + } - for i := int64(0); i < batch; i++ { - ret[offset+i] = byte(values[i]) - } - offset += batch + if valuesRead != r.numRows { + return nil, fmt.Errorf("expect %d rows, but got valuesRead = %d", r.numRows, valuesRead) + } + + ret := make([]byte, r.numRows) + for i := int64(0); i < r.numRows; i++ { + ret[i] = byte(values[i]) } return ret, nil } @@ -130,26 +123,19 @@ func (r *PayloadReader) GetInt8FromPayload() ([]int8, error) { return nil, fmt.Errorf("failed to get int8 from datatype %v", r.colType.String()) } - ret := make([]int8, r.numRows) - values := make([]int32, 4096) - offset := int64(0) - for offset < r.numRows { - batch := int64(4096) - if r.numRows-offset < 4096 { - batch = r.numRows - offset - } - valuesRead, err := ReadDataFromAllRowGroups[int32, *file.Int32ColumnChunkReader](r.reader, values, offset, batch) - if err != nil { - return nil, err - } - if valuesRead != batch { - return nil, fmt.Errorf("expect %d rows, but got valuesRead = %d", r.numRows, valuesRead) - } + values := make([]int32, r.numRows) + valuesRead, err := ReadDataFromAllRowGroups[int32, *file.Int32ColumnChunkReader](r.reader, values, 0, r.numRows) + if err != nil { + return nil, err + } - for i := int64(0); i < batch; i++ { - ret[offset+i] = int8(values[i]) - } - offset += batch + if valuesRead != r.numRows { + return nil, fmt.Errorf("expect %d rows, but got valuesRead = %d", r.numRows, valuesRead) + } + + ret := make([]int8, r.numRows) + for i := int64(0); i < r.numRows; i++ { + ret[i] = int8(values[i]) } return ret, nil } @@ -159,26 +145,19 @@ func (r *PayloadReader) GetInt16FromPayload() ([]int16, error) { return nil, fmt.Errorf("failed to get int16 from datatype %v", r.colType.String()) } - ret := make([]int16, r.numRows) - values := make([]int32, 4096) - offset := int64(0) - for offset < r.numRows { - batch := int64(4096) - if r.numRows-offset < 4096 { - batch = r.numRows - offset - } - valuesRead, err := ReadDataFromAllRowGroups[int32, *file.Int32ColumnChunkReader](r.reader, values, offset, batch) - if err != nil { - return nil, err - } - if valuesRead != batch { - return nil, fmt.Errorf("expect %d rows, but got valuesRead = %d", r.numRows, valuesRead) - } + values := make([]int32, r.numRows) + valuesRead, err := ReadDataFromAllRowGroups[int32, *file.Int32ColumnChunkReader](r.reader, values, 0, r.numRows) + if err != nil { + return nil, err + } - for i := int64(0); i < batch; i++ { - ret[offset+i] = int16(values[i]) - } - offset += batch + if valuesRead != r.numRows { + return nil, fmt.Errorf("expect %d rows, but got valuesRead = %d", r.numRows, valuesRead) + } + + ret := make([]int16, r.numRows) + for i := int64(0); i < r.numRows; i++ { + ret[i] = int16(values[i]) } return ret, nil } @@ -334,39 +313,29 @@ func (r *PayloadReader) Close() { // ReadDataFromAllRowGroups iterates all row groups of file.Reader, and convert column to E. // then calls ReadBatch with provided parameters. -func ReadDataFromAllRowGroups[T any, - E interface { - ReadBatch(int64, []T, []int16, []int16) (int64, int, error) - Skip(nvalues int64) (int64, error) - }, -](reader *file.Reader, values []T, offset int64, numRows int64) (int64, error) { - var read int64 +func ReadDataFromAllRowGroups[T any, E interface { + ReadBatch(int64, []T, []int16, []int16) (int64, int, error) +}](reader *file.Reader, values []T, columnIdx int, numRows int64) (int64, error) { + var offset int64 for i := 0; i < reader.NumRowGroups(); i++ { - rowGroup := reader.RowGroup(i) - rows := rowGroup.NumRows() - if offset >= rows { - offset -= rows - continue + if columnIdx >= reader.RowGroup(i).NumColumns() { + return -1, fmt.Errorf("try to fetch %d-th column of reader but row group has only %d column(s)", columnIdx, reader.RowGroup(i).NumColumns()) } - - column := rowGroup.Column(0) + column := reader.RowGroup(i).Column(columnIdx) cReader, ok := column.(E) if !ok { return -1, fmt.Errorf("expect type %T, but got %T", *new(E), column) } - // skip the rest of the offset - cReader.Skip(offset) - offset = 0 - - _, valuesRead, err := cReader.ReadBatch(numRows, values[read:], nil, nil) + _, valuesRead, err := cReader.ReadBatch(numRows, values[offset:], nil, nil) if err != nil { return -1, err } - read += int64(valuesRead) + + offset += int64(valuesRead) } - return read, nil + return offset, nil }