mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
See #39173 In this PR: - Adjusted the delta log serialization APIs. - Refactored the stats collector to improve the collection and digest of primary key and BM25 statistics. - Introduced new tests for the delta log reader/writer and stats collectors to ensure functionality and correctness. --------- Signed-off-by: Ted Xu <ted.xu@zilliz.com>
628 lines
17 KiB
Go
628 lines
17 KiB
Go
// Licensed to the LF AI & Data foundation under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package storage
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/binary"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"strconv"
|
|
|
|
"github.com/apache/arrow/go/v17/arrow"
|
|
"github.com/apache/arrow/go/v17/arrow/array"
|
|
"github.com/apache/arrow/go/v17/arrow/memory"
|
|
"github.com/cockroachdb/errors"
|
|
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
|
"github.com/milvus-io/milvus/pkg/v2/common"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/merr"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
|
)
|
|
|
|
// newDeltalogOneFieldReader creates a reader for the old single-field deltalog format
|
|
func newDeltalogOneFieldReader(blobs []*Blob) (*DeserializeReaderImpl[*DeleteLog], error) {
|
|
reader := newIterativeCompositeBinlogRecordReader(
|
|
&schemapb.CollectionSchema{
|
|
Fields: []*schemapb.FieldSchema{
|
|
{
|
|
DataType: schemapb.DataType_VarChar,
|
|
},
|
|
},
|
|
},
|
|
nil,
|
|
MakeBlobsReader(blobs))
|
|
return NewDeserializeReader(reader, func(r Record, v []*DeleteLog) error {
|
|
for i := 0; i < r.Len(); i++ {
|
|
if v[i] == nil {
|
|
v[i] = &DeleteLog{}
|
|
}
|
|
// retrieve the only field
|
|
a := r.(*compositeRecord).recs[0].(*array.String)
|
|
strVal := a.Value(i)
|
|
if err := v[i].Parse(strVal); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}), nil
|
|
}
|
|
|
|
// DeltalogStreamWriter writes deltalog in the old JSON format
|
|
type DeltalogStreamWriter struct {
|
|
collectionID UniqueID
|
|
partitionID UniqueID
|
|
segmentID UniqueID
|
|
fieldSchema *schemapb.FieldSchema
|
|
|
|
buf bytes.Buffer
|
|
rw *singleFieldRecordWriter
|
|
}
|
|
|
|
func (dsw *DeltalogStreamWriter) GetRecordWriter() (RecordWriter, error) {
|
|
if dsw.rw != nil {
|
|
return dsw.rw, nil
|
|
}
|
|
rw, err := newSingleFieldRecordWriter(dsw.fieldSchema, &dsw.buf, WithRecordWriterProps(getFieldWriterProps(dsw.fieldSchema)))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
dsw.rw = rw
|
|
return rw, nil
|
|
}
|
|
|
|
func (dsw *DeltalogStreamWriter) Finalize() (*Blob, error) {
|
|
if dsw.rw == nil {
|
|
return nil, io.ErrUnexpectedEOF
|
|
}
|
|
dsw.rw.Close()
|
|
|
|
var b bytes.Buffer
|
|
if err := dsw.writeDeltalogHeaders(&b); err != nil {
|
|
return nil, err
|
|
}
|
|
if _, err := b.Write(dsw.buf.Bytes()); err != nil {
|
|
return nil, err
|
|
}
|
|
return &Blob{
|
|
Value: b.Bytes(),
|
|
RowNum: int64(dsw.rw.numRows),
|
|
MemorySize: int64(dsw.rw.writtenUncompressed),
|
|
}, nil
|
|
}
|
|
|
|
func (dsw *DeltalogStreamWriter) writeDeltalogHeaders(w io.Writer) error {
|
|
// Write magic number
|
|
if err := binary.Write(w, common.Endian, MagicNumber); err != nil {
|
|
return err
|
|
}
|
|
// Write descriptor
|
|
de := NewBaseDescriptorEvent(dsw.collectionID, dsw.partitionID, dsw.segmentID)
|
|
de.PayloadDataType = dsw.fieldSchema.DataType
|
|
de.descriptorEventData.AddExtra(originalSizeKey, strconv.Itoa(int(dsw.rw.writtenUncompressed)))
|
|
if err := de.Write(w); err != nil {
|
|
return err
|
|
}
|
|
// Write event header
|
|
eh := newEventHeader(DeleteEventType)
|
|
// Write event data
|
|
ev := newDeleteEventData()
|
|
ev.StartTimestamp = 1
|
|
ev.EndTimestamp = 1
|
|
eh.EventLength = int32(dsw.buf.Len()) + eh.GetMemoryUsageInBytes() + int32(binary.Size(ev))
|
|
// eh.NextPosition = eh.EventLength + w.Offset()
|
|
if err := eh.Write(w); err != nil {
|
|
return err
|
|
}
|
|
if err := ev.WriteEventData(w); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func newDeltalogStreamWriter(collectionID, partitionID, segmentID UniqueID) *DeltalogStreamWriter {
|
|
return &DeltalogStreamWriter{
|
|
collectionID: collectionID,
|
|
partitionID: partitionID,
|
|
segmentID: segmentID,
|
|
fieldSchema: &schemapb.FieldSchema{
|
|
FieldID: common.RowIDField,
|
|
Name: "delta",
|
|
DataType: schemapb.DataType_String,
|
|
},
|
|
}
|
|
}
|
|
|
|
func newDeltalogSerializeWriter(eventWriter *DeltalogStreamWriter, batchSize int) (*SerializeWriterImpl[*DeleteLog], error) {
|
|
rws := make(map[FieldID]RecordWriter, 1)
|
|
rw, err := eventWriter.GetRecordWriter()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
rws[0] = rw
|
|
compositeRecordWriter := NewCompositeRecordWriter(rws)
|
|
return NewSerializeRecordWriter(compositeRecordWriter, func(v []*DeleteLog) (Record, error) {
|
|
builder := array.NewBuilder(memory.DefaultAllocator, arrow.BinaryTypes.String)
|
|
|
|
for _, vv := range v {
|
|
strVal, err := json.Marshal(vv)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
builder.AppendValueFromString(string(strVal))
|
|
}
|
|
arr := []arrow.Array{builder.NewArray()}
|
|
field := []arrow.Field{{
|
|
Name: "delta",
|
|
Type: arrow.BinaryTypes.String,
|
|
Nullable: false,
|
|
}}
|
|
field2Col := map[FieldID]int{
|
|
0: 0,
|
|
}
|
|
return NewSimpleArrowRecord(array.NewRecord(arrow.NewSchema(field, nil), arr, int64(len(v))), field2Col), nil
|
|
}, batchSize), nil
|
|
}
|
|
|
|
var _ RecordReader = (*simpleArrowRecordReader)(nil)
|
|
|
|
// simpleArrowRecordReader reads simple arrow records from blobs
|
|
type simpleArrowRecordReader struct {
|
|
blobs []*Blob
|
|
|
|
blobPos int
|
|
rr array.RecordReader
|
|
closer func()
|
|
|
|
r simpleArrowRecord
|
|
}
|
|
|
|
func (crr *simpleArrowRecordReader) iterateNextBatch() error {
|
|
if crr.closer != nil {
|
|
crr.closer()
|
|
}
|
|
|
|
crr.blobPos++
|
|
if crr.blobPos >= len(crr.blobs) {
|
|
return io.EOF
|
|
}
|
|
|
|
reader, err := NewBinlogReader(crr.blobs[crr.blobPos].Value)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
er, err := reader.NextEventReader()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
rr, err := er.GetArrowRecordReader()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
crr.rr = rr
|
|
crr.closer = func() {
|
|
crr.rr.Release()
|
|
er.Close()
|
|
reader.Close()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (crr *simpleArrowRecordReader) Next() (Record, error) {
|
|
if crr.rr == nil {
|
|
if len(crr.blobs) == 0 {
|
|
return nil, io.EOF
|
|
}
|
|
crr.blobPos = -1
|
|
crr.r = simpleArrowRecord{
|
|
field2Col: make(map[FieldID]int),
|
|
}
|
|
if err := crr.iterateNextBatch(); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
composeRecord := func() bool {
|
|
if ok := crr.rr.Next(); !ok {
|
|
return false
|
|
}
|
|
record := crr.rr.Record()
|
|
for i := range record.Schema().Fields() {
|
|
crr.r.field2Col[FieldID(i)] = i
|
|
}
|
|
crr.r.r = record
|
|
return true
|
|
}
|
|
|
|
if ok := composeRecord(); !ok {
|
|
if err := crr.iterateNextBatch(); err != nil {
|
|
return nil, err
|
|
}
|
|
if ok := composeRecord(); !ok {
|
|
return nil, io.EOF
|
|
}
|
|
}
|
|
return &crr.r, nil
|
|
}
|
|
|
|
func (crr *simpleArrowRecordReader) SetNeededFields(_ typeutil.Set[int64]) {
|
|
// no-op for simple arrow record reader
|
|
}
|
|
|
|
func (crr *simpleArrowRecordReader) Close() error {
|
|
if crr.closer != nil {
|
|
crr.closer()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func newSimpleArrowRecordReader(blobs []*Blob) (*simpleArrowRecordReader, error) {
|
|
return &simpleArrowRecordReader{
|
|
blobs: blobs,
|
|
}, nil
|
|
}
|
|
|
|
// MultiFieldDeltalogStreamWriter writes deltalog in the new multi-field parquet format
|
|
type MultiFieldDeltalogStreamWriter struct {
|
|
collectionID UniqueID
|
|
partitionID UniqueID
|
|
segmentID UniqueID
|
|
pkType schemapb.DataType
|
|
|
|
buf bytes.Buffer
|
|
rw *multiFieldRecordWriter
|
|
}
|
|
|
|
func newMultiFieldDeltalogStreamWriter(collectionID, partitionID, segmentID UniqueID, pkType schemapb.DataType) *MultiFieldDeltalogStreamWriter {
|
|
return &MultiFieldDeltalogStreamWriter{
|
|
collectionID: collectionID,
|
|
partitionID: partitionID,
|
|
segmentID: segmentID,
|
|
pkType: pkType,
|
|
}
|
|
}
|
|
|
|
func (dsw *MultiFieldDeltalogStreamWriter) GetRecordWriter() (RecordWriter, error) {
|
|
if dsw.rw != nil {
|
|
return dsw.rw, nil
|
|
}
|
|
|
|
fieldIDs := []FieldID{common.RowIDField, common.TimeStampField} // Not used.
|
|
fields := []arrow.Field{
|
|
{
|
|
Name: "pk",
|
|
Type: serdeMap[dsw.pkType].arrowType(0, schemapb.DataType_None),
|
|
Nullable: false,
|
|
},
|
|
{
|
|
Name: "ts",
|
|
Type: arrow.PrimitiveTypes.Int64,
|
|
Nullable: false,
|
|
},
|
|
}
|
|
|
|
rw, err := newMultiFieldRecordWriter(fieldIDs, fields, &dsw.buf)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
dsw.rw = rw
|
|
return rw, nil
|
|
}
|
|
|
|
func (dsw *MultiFieldDeltalogStreamWriter) Finalize() (*Blob, error) {
|
|
if dsw.rw == nil {
|
|
return nil, io.ErrUnexpectedEOF
|
|
}
|
|
dsw.rw.Close()
|
|
|
|
var b bytes.Buffer
|
|
if err := dsw.writeDeltalogHeaders(&b); err != nil {
|
|
return nil, err
|
|
}
|
|
if _, err := b.Write(dsw.buf.Bytes()); err != nil {
|
|
return nil, err
|
|
}
|
|
return &Blob{
|
|
Value: b.Bytes(),
|
|
RowNum: int64(dsw.rw.numRows),
|
|
MemorySize: int64(dsw.rw.writtenUncompressed),
|
|
}, nil
|
|
}
|
|
|
|
func (dsw *MultiFieldDeltalogStreamWriter) writeDeltalogHeaders(w io.Writer) error {
|
|
// Write magic number
|
|
if err := binary.Write(w, common.Endian, MagicNumber); err != nil {
|
|
return err
|
|
}
|
|
// Write descriptor
|
|
de := NewBaseDescriptorEvent(dsw.collectionID, dsw.partitionID, dsw.segmentID)
|
|
de.PayloadDataType = schemapb.DataType_Int64
|
|
de.descriptorEventData.AddExtra(originalSizeKey, strconv.Itoa(int(dsw.rw.writtenUncompressed)))
|
|
de.descriptorEventData.AddExtra(version, MultiField)
|
|
if err := de.Write(w); err != nil {
|
|
return err
|
|
}
|
|
// Write event header
|
|
eh := newEventHeader(DeleteEventType)
|
|
// Write event data
|
|
ev := newDeleteEventData()
|
|
ev.StartTimestamp = 1
|
|
ev.EndTimestamp = 1
|
|
eh.EventLength = int32(dsw.buf.Len()) + eh.GetMemoryUsageInBytes() + int32(binary.Size(ev))
|
|
// eh.NextPosition = eh.EventLength + w.Offset()
|
|
if err := eh.Write(w); err != nil {
|
|
return err
|
|
}
|
|
if err := ev.WriteEventData(w); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func newDeltalogMultiFieldWriter(eventWriter *MultiFieldDeltalogStreamWriter, batchSize int) (*SerializeWriterImpl[*DeleteLog], error) {
|
|
rw, err := eventWriter.GetRecordWriter()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return NewSerializeRecordWriter[*DeleteLog](rw, func(v []*DeleteLog) (Record, error) {
|
|
fields := []arrow.Field{
|
|
{
|
|
Name: "pk",
|
|
Type: serdeMap[schemapb.DataType(v[0].PkType)].arrowType(0, schemapb.DataType_None),
|
|
Nullable: false,
|
|
},
|
|
{
|
|
Name: "ts",
|
|
Type: arrow.PrimitiveTypes.Int64,
|
|
Nullable: false,
|
|
},
|
|
}
|
|
arrowSchema := arrow.NewSchema(fields, nil)
|
|
builder := array.NewRecordBuilder(memory.DefaultAllocator, arrowSchema)
|
|
defer builder.Release()
|
|
|
|
pkType := schemapb.DataType(v[0].PkType)
|
|
switch pkType {
|
|
case schemapb.DataType_Int64:
|
|
pb := builder.Field(0).(*array.Int64Builder)
|
|
for _, vv := range v {
|
|
pk := vv.Pk.GetValue().(int64)
|
|
pb.Append(pk)
|
|
}
|
|
case schemapb.DataType_VarChar:
|
|
pb := builder.Field(0).(*array.StringBuilder)
|
|
for _, vv := range v {
|
|
pk := vv.Pk.GetValue().(string)
|
|
pb.Append(pk)
|
|
}
|
|
default:
|
|
return nil, fmt.Errorf("unexpected pk type %v", v[0].PkType)
|
|
}
|
|
|
|
for _, vv := range v {
|
|
builder.Field(1).(*array.Int64Builder).Append(int64(vv.Ts))
|
|
}
|
|
|
|
arr := []arrow.Array{builder.Field(0).NewArray(), builder.Field(1).NewArray()}
|
|
|
|
field2Col := map[FieldID]int{
|
|
common.RowIDField: 0,
|
|
common.TimeStampField: 1,
|
|
}
|
|
return NewSimpleArrowRecord(array.NewRecord(arrowSchema, arr, int64(len(v))), field2Col), nil
|
|
}, batchSize), nil
|
|
}
|
|
|
|
func newDeltalogMultiFieldReader(blobs []*Blob) (*DeserializeReaderImpl[*DeleteLog], error) {
|
|
reader, err := newSimpleArrowRecordReader(blobs)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return NewDeserializeReader(reader, func(r Record, v []*DeleteLog) error {
|
|
rec, ok := r.(*simpleArrowRecord)
|
|
if !ok {
|
|
return errors.New("can not cast to simple arrow record")
|
|
}
|
|
fields := rec.r.Schema().Fields()
|
|
switch fields[0].Type.ID() {
|
|
case arrow.INT64:
|
|
arr := r.Column(0).(*array.Int64)
|
|
for j := 0; j < r.Len(); j++ {
|
|
if v[j] == nil {
|
|
v[j] = &DeleteLog{}
|
|
}
|
|
v[j].Pk = NewInt64PrimaryKey(arr.Value(j))
|
|
}
|
|
case arrow.STRING:
|
|
arr := r.Column(0).(*array.String)
|
|
for j := 0; j < r.Len(); j++ {
|
|
if v[j] == nil {
|
|
v[j] = &DeleteLog{}
|
|
}
|
|
v[j].Pk = NewVarCharPrimaryKey(arr.Value(j))
|
|
}
|
|
default:
|
|
return fmt.Errorf("unexpected delta log pkType %v", fields[0].Type.Name())
|
|
}
|
|
|
|
arr := r.Column(1).(*array.Int64)
|
|
for j := 0; j < r.Len(); j++ {
|
|
v[j].Ts = uint64(arr.Value(j))
|
|
}
|
|
return nil
|
|
}), nil
|
|
}
|
|
|
|
// newDeltalogDeserializeReader is the entry point for the delta log reader.
|
|
// It includes newDeltalogOneFieldReader, which uses the existing log format with only one column in a log file,
|
|
// and newDeltalogMultiFieldReader, which uses the new format and supports multiple fields in a log file.
|
|
func newDeltalogDeserializeReader(blobs []*Blob) (*DeserializeReaderImpl[*DeleteLog], error) {
|
|
if supportMultiFieldFormat(blobs) {
|
|
return newDeltalogMultiFieldReader(blobs)
|
|
}
|
|
return newDeltalogOneFieldReader(blobs)
|
|
}
|
|
|
|
// supportMultiFieldFormat checks delta log description data to see if it is the format with
|
|
// pk and ts column separately
|
|
func supportMultiFieldFormat(blobs []*Blob) bool {
|
|
if len(blobs) > 0 {
|
|
reader, err := NewBinlogReader(blobs[0].Value)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
defer reader.Close()
|
|
version := reader.descriptorEventData.Extras[version]
|
|
return version != nil && version.(string) == MultiField
|
|
}
|
|
return false
|
|
}
|
|
|
|
// CreateDeltalogReader creates a deltalog reader based on the format version
|
|
func CreateDeltalogReader(blobs []*Blob) (*DeserializeReaderImpl[*DeleteLog], error) {
|
|
return newDeltalogDeserializeReader(blobs)
|
|
}
|
|
|
|
// createDeltalogWriter creates a deltalog writer based on the configured format
|
|
func createDeltalogWriter(collectionID, partitionID, segmentID UniqueID, pkType schemapb.DataType, batchSize int,
|
|
) (*SerializeWriterImpl[*DeleteLog], func() (*Blob, error), error) {
|
|
format := paramtable.Get().DataNodeCfg.DeltalogFormat.GetValue()
|
|
switch format {
|
|
case "json":
|
|
eventWriter := newDeltalogStreamWriter(collectionID, partitionID, segmentID)
|
|
writer, err := newDeltalogSerializeWriter(eventWriter, batchSize)
|
|
return writer, eventWriter.Finalize, err
|
|
case "parquet":
|
|
eventWriter := newMultiFieldDeltalogStreamWriter(collectionID, partitionID, segmentID, pkType)
|
|
writer, err := newDeltalogMultiFieldWriter(eventWriter, batchSize)
|
|
return writer, eventWriter.Finalize, err
|
|
default:
|
|
return nil, nil, merr.WrapErrParameterInvalid("unsupported deltalog format %s", format)
|
|
}
|
|
}
|
|
|
|
type LegacyDeltalogWriter struct {
|
|
path string
|
|
pkType schemapb.DataType
|
|
writer *SerializeWriterImpl[*DeleteLog]
|
|
finalizer func() (*Blob, error)
|
|
writtenUncompressed uint64
|
|
|
|
uploader uploaderFn
|
|
}
|
|
|
|
var _ RecordWriter = (*LegacyDeltalogWriter)(nil)
|
|
|
|
func NewLegacyDeltalogWriter(
|
|
collectionID, partitionID, segmentID, logID UniqueID, pkType schemapb.DataType, uploader uploaderFn, path string,
|
|
) (*LegacyDeltalogWriter, error) {
|
|
writer, finalizer, err := createDeltalogWriter(collectionID, partitionID, segmentID, pkType, 4096)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &LegacyDeltalogWriter{
|
|
path: path,
|
|
pkType: pkType,
|
|
writer: writer,
|
|
finalizer: finalizer,
|
|
uploader: uploader,
|
|
}, nil
|
|
}
|
|
|
|
func (w *LegacyDeltalogWriter) Write(rec Record) error {
|
|
newDeleteLog := func(i int) (*DeleteLog, error) {
|
|
ts := Timestamp(rec.Column(1).(*array.Int64).Value(i))
|
|
switch w.pkType {
|
|
case schemapb.DataType_Int64:
|
|
pk := NewInt64PrimaryKey(rec.Column(0).(*array.Int64).Value(i))
|
|
return NewDeleteLog(pk, ts), nil
|
|
case schemapb.DataType_VarChar:
|
|
pk := NewVarCharPrimaryKey(rec.Column(0).(*array.String).Value(i))
|
|
return NewDeleteLog(pk, ts), nil
|
|
default:
|
|
return nil, fmt.Errorf("unexpected pk type %v", w.pkType)
|
|
}
|
|
}
|
|
|
|
for i := range rec.Len() {
|
|
deleteLog, err := newDeleteLog(i)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = w.writer.WriteValue(deleteLog)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
w.writtenUncompressed += (rec.Column(0).Data().SizeInBytes() + rec.Column(1).Data().SizeInBytes())
|
|
return nil
|
|
}
|
|
|
|
func (w *LegacyDeltalogWriter) Close() error {
|
|
err := w.writer.Close()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
blob, err := w.finalizer()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return w.uploader(context.Background(), map[string][]byte{blob.Key: blob.Value})
|
|
}
|
|
|
|
func (w *LegacyDeltalogWriter) GetWrittenUncompressed() uint64 {
|
|
return w.writtenUncompressed
|
|
}
|
|
|
|
func NewLegacyDeltalogReader(pkField *schemapb.FieldSchema, downloader downloaderFn, paths []string) (RecordReader, error) {
|
|
schema := &schemapb.CollectionSchema{
|
|
Fields: []*schemapb.FieldSchema{
|
|
pkField,
|
|
{
|
|
FieldID: common.TimeStampField,
|
|
DataType: schemapb.DataType_Int64,
|
|
},
|
|
},
|
|
}
|
|
|
|
chunkPos := 0
|
|
blobsReader := func() ([]*Blob, error) {
|
|
path := paths[chunkPos]
|
|
chunkPos++
|
|
blobs, err := downloader(context.Background(), []string{path})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return []*Blob{{Key: path, Value: blobs[0]}}, nil
|
|
}
|
|
|
|
return newIterativeCompositeBinlogRecordReader(
|
|
schema,
|
|
nil,
|
|
blobsReader,
|
|
nil,
|
|
), nil
|
|
}
|