diff --git a/internal/storage/data_codec_test.go b/internal/storage/data_codec_test.go index 631acf0eb7..de94a71a9e 100644 --- a/internal/storage/data_codec_test.go +++ b/internal/storage/data_codec_test.go @@ -59,7 +59,7 @@ const ( ) func assertTestData(t *testing.T, i int, value *Value) { - assertTestDataInternal(t, i, value, true) + assertTestDataInternal(t, i, value, false) } // Verify value of index i (1-based numbering) in data generated by generateTestData @@ -130,9 +130,7 @@ func assertTestAddedFieldData(t *testing.T, i int, value *Value) { }, }, } - f18b, err := proto.Marshal(f18) - assert.Nil(t, err) - return f18b + return f18 } f102 := make([]float32, 8) diff --git a/internal/storage/rw.go b/internal/storage/rw.go index 8e759c98e7..918f32129b 100644 --- a/internal/storage/rw.go +++ b/internal/storage/rw.go @@ -105,6 +105,13 @@ func WithColumnGroups(columnGroups []storagecommon.ColumnGroup) RwOption { } } +func GuessStorageVersion(binlogs []*datapb.FieldBinlog, schema *schemapb.CollectionSchema) int64 { + if len(binlogs) == len(schema.Fields) { + return StorageV1 + } + return StorageV2 +} + func makeBlobsReader(ctx context.Context, binlogs []*datapb.FieldBinlog, downloader downloaderFn) (ChunkedBlobsReader, error) { if len(binlogs) == 0 { return func() ([]*Blob, error) { diff --git a/internal/storage/serde.go b/internal/storage/serde.go index 0bb02165ac..e15b2b1b1a 100644 --- a/internal/storage/serde.go +++ b/internal/storage/serde.go @@ -74,10 +74,7 @@ func (r *compositeRecord) Column(i FieldID) arrow.Array { } func (r *compositeRecord) Len() int { - for _, rec := range r.recs { - return rec.Len() - } - return 0 + return r.recs[0].Len() } func (r *compositeRecord) Release() { @@ -396,7 +393,7 @@ var serdeMap = func() map[schemapb.DataType]serdeEntry { }, } - m[schemapb.DataType_Array] = byteEntry + m[schemapb.DataType_Array] = eagerArrayEntry m[schemapb.DataType_JSON] = byteEntry fixedSizeDeserializer := func(a arrow.Array, i int) (any, bool) { @@ -447,7 +444,21 @@ var serdeMap = func() map[schemapb.DataType]serdeEntry { func(i int) arrow.DataType { return &arrow.FixedSizeBinaryType{ByteWidth: i} }, - fixedSizeDeserializer, + func(a arrow.Array, i int) (any, bool) { + if a.IsNull(i) { + return nil, true + } + if arr, ok := a.(*array.FixedSizeBinary); ok && i < arr.Len() { + // convert to []int8 + bytes := arr.Value(i) + int8s := make([]int8, len(bytes)) + for i, b := range bytes { + int8s[i] = int8(b) + } + return int8s, true + } + return nil, false + }, fixedSizeSerializer, } m[schemapb.DataType_FloatVector] = serdeEntry{ @@ -520,7 +531,7 @@ type DeserializeReaderImpl[T any] struct { // Iterate to next value, return error or EOF if no more value. func (deser *DeserializeReaderImpl[T]) NextValue() (*T, error) { - if deser.rec == nil || deser.pos >= deser.rec.Len()-1 { + if deser.pos == 0 || deser.pos >= len(deser.values) { r, err := deser.rr.Next() if err != nil { return nil, err @@ -533,11 +544,10 @@ func (deser *DeserializeReaderImpl[T]) NextValue() (*T, error) { if err := deser.deserializer(deser.rec, deser.values); err != nil { return nil, err } - } else { - deser.pos++ } - - return &deser.values[deser.pos], nil + ret := &deser.values[deser.pos] + deser.pos++ + return ret, nil } func (deser *DeserializeReaderImpl[T]) Close() error { diff --git a/internal/util/importutilv2/binlog/field_reader.go b/internal/util/importutilv2/binlog/field_reader.go deleted file mode 100644 index c9bef62ff2..0000000000 --- a/internal/util/importutilv2/binlog/field_reader.go +++ /dev/null @@ -1,62 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package binlog - -import ( - "context" - - "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" - "github.com/milvus-io/milvus/internal/storage" -) - -type fieldReader struct { - reader *storage.BinlogReader - fieldSchema *schemapb.FieldSchema -} - -func newFieldReader(ctx context.Context, cm storage.ChunkManager, fieldSchema *schemapb.FieldSchema, path string) (*fieldReader, error) { - reader, err := newBinlogReader(ctx, cm, path) - if err != nil { - return nil, err - } - return &fieldReader{ - reader: reader, - fieldSchema: fieldSchema, - }, nil -} - -func (r *fieldReader) Next() (storage.FieldData, error) { - fieldData, err := storage.NewFieldData(r.fieldSchema.GetDataType(), r.fieldSchema, 0) - if err != nil { - return nil, err - } - rowsSet, validDataRows, err := readData(r.reader, storage.InsertEventType) - if err != nil { - return nil, err - } - for i, rows := range rowsSet { - err = fieldData.AppendRows(rows, validDataRows[i]) - if err != nil { - return nil, err - } - } - return fieldData, nil -} - -func (r *fieldReader) Close() { - r.reader.Close() -} diff --git a/internal/util/importutilv2/binlog/reader.go b/internal/util/importutilv2/binlog/reader.go index 4767e34005..65fd87b781 100644 --- a/internal/util/importutilv2/binlog/reader.go +++ b/internal/util/importutilv2/binlog/reader.go @@ -29,6 +29,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/v2/log" + "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) @@ -40,10 +41,10 @@ type reader struct { fileSize *atomic.Int64 deleteData map[any]typeutil.Timestamp // pk2ts - insertLogs map[int64][]string // fieldID -> binlogs + insertLogs map[int64][]string // fieldID (or fieldGroupID if storage v2) -> binlogs - readIdx int filters []Filter + dr storage.DeserializeReader[*storage.Value] } func NewReader(ctx context.Context, @@ -53,7 +54,16 @@ func NewReader(ctx context.Context, tsStart, tsEnd uint64, ) (*reader, error) { - schema = typeutil.AppendSystemFields(schema) + systemFieldsAbsent := true + for _, field := range schema.Fields { + if field.GetFieldID() < 100 { + systemFieldsAbsent = false + break + } + } + if systemFieldsAbsent { + schema = typeutil.AppendSystemFields(schema) + } r := &reader{ ctx: ctx, cm: cm, @@ -88,6 +98,33 @@ func (r *reader) init(paths []string, tsStart, tsEnd uint64) error { } r.insertLogs = insertLogs + binlogs := lo.Map(r.schema.Fields, func(field *schemapb.FieldSchema, _ int) *datapb.FieldBinlog { + id := field.GetFieldID() + return &datapb.FieldBinlog{ + FieldID: id, + Binlogs: lo.Map(r.insertLogs[id], func(path string, _ int) *datapb.Binlog { + return &datapb.Binlog{ + LogPath: path, + } + }), + } + }) + + storageVersion := storage.GuessStorageVersion(binlogs, r.schema) + rr, err := storage.NewBinlogRecordReader(r.ctx, binlogs, r.schema, + storage.WithVersion(storageVersion), + storage.WithBufferSize(32*1024*1024), + storage.WithDownloader(func(ctx context.Context, paths []string) ([][]byte, error) { + return r.cm.MultiRead(ctx, paths) + }), + ) + if err != nil { + return err + } + r.dr = storage.NewDeserializeReader(rr, func(record storage.Record, v []*storage.Value) error { + return storage.ValueDeserializer(record, v, r.schema.Fields) + }) + if len(paths) < 2 { return nil } @@ -155,35 +192,39 @@ func (r *reader) Read() (*storage.InsertData, error) { if err != nil { return nil, err } - if r.readIdx == len(r.insertLogs[0]) { - // In the binlog import scenario, all data may be filtered out - // due to time range or deletions. Therefore, we use io.EOF as - // the indicator of the read end, instead of InsertData with 0 rows. - return nil, io.EOF - } - for fieldID, binlogs := range r.insertLogs { - field := typeutil.GetField(r.schema, fieldID) - if field == nil { - return nil, merr.WrapErrFieldNotFound(fieldID) + + for range 4096 { + v, err := r.dr.NextValue() + if err == io.EOF { + if insertData.GetRowNum() == 0 { + return nil, io.EOF + } + break } - path := binlogs[r.readIdx] - fr, err := newFieldReader(r.ctx, r.cm, field, path) if err != nil { return nil, err } - fieldData, err := fr.Next() - if err != nil { - fr.Close() - return nil, err + // convert record to fieldData + for _, field := range r.schema.Fields { + fieldData := insertData.Data[field.GetFieldID()] + if fieldData == nil { + fieldData, err = storage.NewFieldData(field.GetDataType(), field, 1024) + if err != nil { + return nil, err + } + insertData.Data[field.GetFieldID()] = fieldData + } + + err := fieldData.AppendRow((*v).Value.(map[int64]any)[field.GetFieldID()]) + if err != nil { + return nil, err + } } - fr.Close() - insertData.Data[field.GetFieldID()] = fieldData } insertData, err = r.filter(insertData) if err != nil { return nil, err } - r.readIdx++ return insertData, nil } diff --git a/internal/util/importutilv2/binlog/reader_test.go b/internal/util/importutilv2/binlog/reader_test.go index 16734c73ae..9a021a9cd3 100644 --- a/internal/util/importutilv2/binlog/reader_test.go +++ b/internal/util/importutilv2/binlog/reader_test.go @@ -225,6 +225,18 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data } schema := &schemapb.CollectionSchema{ Fields: []*schemapb.FieldSchema{ + { + FieldID: int64(common.RowIDField), + Name: common.RowIDFieldName, + IsPrimaryKey: false, + DataType: schemapb.DataType_Int64, + }, + { + FieldID: int64(common.TimeStampField), + Name: common.TimeStampFieldName, + IsPrimaryKey: false, + DataType: schemapb.DataType_Int64, + }, { FieldID: 100, Name: "pk", @@ -252,7 +264,6 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data }, } cm := mocks.NewChunkManager(suite.T()) - schema = typeutil.AppendSystemFields(schema) originalInsertData, err := testutil.CreateInsertData(schema, suite.numRows) suite.NoError(err) @@ -276,12 +287,15 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data } return nil }) - for fieldID, paths := range insertBinlogs { - field := typeutil.GetField(schema, fieldID) - suite.NotNil(field) - buf0 := createBinlogBuf(suite.T(), field, originalInsertData.Data[fieldID]) - cm.EXPECT().Read(mock.Anything, paths[0]).Return(buf0, nil) + var ( + paths = make([]string, 0) + bytes = make([][]byte, 0) + ) + for _, field := range schema.Fields { + paths = append(paths, insertBinlogs[field.GetFieldID()][0]) + bytes = append(bytes, createBinlogBuf(suite.T(), field, originalInsertData.Data[field.GetFieldID()])) } + cm.EXPECT().MultiRead(mock.Anything, paths).Return(bytes, nil) if len(suite.deletePKs) != 0 { for _, path := range deltaLogs {