feat: bulk insert support storage v2 (#41843)

See #39173

---------

Signed-off-by: Ted Xu <ted.xu@zilliz.com>
This commit is contained in:
Ted Xu 2025-05-19 10:34:24 +08:00 committed by GitHub
parent 59dff668dc
commit 7660be0993
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 113 additions and 105 deletions

View File

@ -59,7 +59,7 @@ const (
) )
func assertTestData(t *testing.T, i int, value *Value) { func assertTestData(t *testing.T, i int, value *Value) {
assertTestDataInternal(t, i, value, true) assertTestDataInternal(t, i, value, false)
} }
// Verify value of index i (1-based numbering) in data generated by generateTestData // Verify value of index i (1-based numbering) in data generated by generateTestData
@ -130,9 +130,7 @@ func assertTestAddedFieldData(t *testing.T, i int, value *Value) {
}, },
}, },
} }
f18b, err := proto.Marshal(f18) return f18
assert.Nil(t, err)
return f18b
} }
f102 := make([]float32, 8) f102 := make([]float32, 8)

View File

@ -105,6 +105,13 @@ func WithColumnGroups(columnGroups []storagecommon.ColumnGroup) RwOption {
} }
} }
func GuessStorageVersion(binlogs []*datapb.FieldBinlog, schema *schemapb.CollectionSchema) int64 {
if len(binlogs) == len(schema.Fields) {
return StorageV1
}
return StorageV2
}
func makeBlobsReader(ctx context.Context, binlogs []*datapb.FieldBinlog, downloader downloaderFn) (ChunkedBlobsReader, error) { func makeBlobsReader(ctx context.Context, binlogs []*datapb.FieldBinlog, downloader downloaderFn) (ChunkedBlobsReader, error) {
if len(binlogs) == 0 { if len(binlogs) == 0 {
return func() ([]*Blob, error) { return func() ([]*Blob, error) {

View File

@ -74,10 +74,7 @@ func (r *compositeRecord) Column(i FieldID) arrow.Array {
} }
func (r *compositeRecord) Len() int { func (r *compositeRecord) Len() int {
for _, rec := range r.recs { return r.recs[0].Len()
return rec.Len()
}
return 0
} }
func (r *compositeRecord) Release() { func (r *compositeRecord) Release() {
@ -396,7 +393,7 @@ var serdeMap = func() map[schemapb.DataType]serdeEntry {
}, },
} }
m[schemapb.DataType_Array] = byteEntry m[schemapb.DataType_Array] = eagerArrayEntry
m[schemapb.DataType_JSON] = byteEntry m[schemapb.DataType_JSON] = byteEntry
fixedSizeDeserializer := func(a arrow.Array, i int) (any, bool) { fixedSizeDeserializer := func(a arrow.Array, i int) (any, bool) {
@ -447,7 +444,21 @@ var serdeMap = func() map[schemapb.DataType]serdeEntry {
func(i int) arrow.DataType { func(i int) arrow.DataType {
return &arrow.FixedSizeBinaryType{ByteWidth: i} return &arrow.FixedSizeBinaryType{ByteWidth: i}
}, },
fixedSizeDeserializer, func(a arrow.Array, i int) (any, bool) {
if a.IsNull(i) {
return nil, true
}
if arr, ok := a.(*array.FixedSizeBinary); ok && i < arr.Len() {
// convert to []int8
bytes := arr.Value(i)
int8s := make([]int8, len(bytes))
for i, b := range bytes {
int8s[i] = int8(b)
}
return int8s, true
}
return nil, false
},
fixedSizeSerializer, fixedSizeSerializer,
} }
m[schemapb.DataType_FloatVector] = serdeEntry{ m[schemapb.DataType_FloatVector] = serdeEntry{
@ -520,7 +531,7 @@ type DeserializeReaderImpl[T any] struct {
// Iterate to next value, return error or EOF if no more value. // Iterate to next value, return error or EOF if no more value.
func (deser *DeserializeReaderImpl[T]) NextValue() (*T, error) { func (deser *DeserializeReaderImpl[T]) NextValue() (*T, error) {
if deser.rec == nil || deser.pos >= deser.rec.Len()-1 { if deser.pos == 0 || deser.pos >= len(deser.values) {
r, err := deser.rr.Next() r, err := deser.rr.Next()
if err != nil { if err != nil {
return nil, err return nil, err
@ -533,11 +544,10 @@ func (deser *DeserializeReaderImpl[T]) NextValue() (*T, error) {
if err := deser.deserializer(deser.rec, deser.values); err != nil { if err := deser.deserializer(deser.rec, deser.values); err != nil {
return nil, err return nil, err
} }
} else {
deser.pos++
} }
ret := &deser.values[deser.pos]
return &deser.values[deser.pos], nil deser.pos++
return ret, nil
} }
func (deser *DeserializeReaderImpl[T]) Close() error { func (deser *DeserializeReaderImpl[T]) Close() error {

View File

@ -1,62 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package binlog
import (
"context"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/storage"
)
type fieldReader struct {
reader *storage.BinlogReader
fieldSchema *schemapb.FieldSchema
}
func newFieldReader(ctx context.Context, cm storage.ChunkManager, fieldSchema *schemapb.FieldSchema, path string) (*fieldReader, error) {
reader, err := newBinlogReader(ctx, cm, path)
if err != nil {
return nil, err
}
return &fieldReader{
reader: reader,
fieldSchema: fieldSchema,
}, nil
}
func (r *fieldReader) Next() (storage.FieldData, error) {
fieldData, err := storage.NewFieldData(r.fieldSchema.GetDataType(), r.fieldSchema, 0)
if err != nil {
return nil, err
}
rowsSet, validDataRows, err := readData(r.reader, storage.InsertEventType)
if err != nil {
return nil, err
}
for i, rows := range rowsSet {
err = fieldData.AppendRows(rows, validDataRows[i])
if err != nil {
return nil, err
}
}
return fieldData, nil
}
func (r *fieldReader) Close() {
r.reader.Close()
}

View File

@ -29,6 +29,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil" "github.com/milvus-io/milvus/pkg/v2/util/typeutil"
) )
@ -40,10 +41,10 @@ type reader struct {
fileSize *atomic.Int64 fileSize *atomic.Int64
deleteData map[any]typeutil.Timestamp // pk2ts deleteData map[any]typeutil.Timestamp // pk2ts
insertLogs map[int64][]string // fieldID -> binlogs insertLogs map[int64][]string // fieldID (or fieldGroupID if storage v2) -> binlogs
readIdx int
filters []Filter filters []Filter
dr storage.DeserializeReader[*storage.Value]
} }
func NewReader(ctx context.Context, func NewReader(ctx context.Context,
@ -53,7 +54,16 @@ func NewReader(ctx context.Context,
tsStart, tsStart,
tsEnd uint64, tsEnd uint64,
) (*reader, error) { ) (*reader, error) {
systemFieldsAbsent := true
for _, field := range schema.Fields {
if field.GetFieldID() < 100 {
systemFieldsAbsent = false
break
}
}
if systemFieldsAbsent {
schema = typeutil.AppendSystemFields(schema) schema = typeutil.AppendSystemFields(schema)
}
r := &reader{ r := &reader{
ctx: ctx, ctx: ctx,
cm: cm, cm: cm,
@ -88,6 +98,33 @@ func (r *reader) init(paths []string, tsStart, tsEnd uint64) error {
} }
r.insertLogs = insertLogs r.insertLogs = insertLogs
binlogs := lo.Map(r.schema.Fields, func(field *schemapb.FieldSchema, _ int) *datapb.FieldBinlog {
id := field.GetFieldID()
return &datapb.FieldBinlog{
FieldID: id,
Binlogs: lo.Map(r.insertLogs[id], func(path string, _ int) *datapb.Binlog {
return &datapb.Binlog{
LogPath: path,
}
}),
}
})
storageVersion := storage.GuessStorageVersion(binlogs, r.schema)
rr, err := storage.NewBinlogRecordReader(r.ctx, binlogs, r.schema,
storage.WithVersion(storageVersion),
storage.WithBufferSize(32*1024*1024),
storage.WithDownloader(func(ctx context.Context, paths []string) ([][]byte, error) {
return r.cm.MultiRead(ctx, paths)
}),
)
if err != nil {
return err
}
r.dr = storage.NewDeserializeReader(rr, func(record storage.Record, v []*storage.Value) error {
return storage.ValueDeserializer(record, v, r.schema.Fields)
})
if len(paths) < 2 { if len(paths) < 2 {
return nil return nil
} }
@ -155,35 +192,39 @@ func (r *reader) Read() (*storage.InsertData, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
if r.readIdx == len(r.insertLogs[0]) {
// In the binlog import scenario, all data may be filtered out for range 4096 {
// due to time range or deletions. Therefore, we use io.EOF as v, err := r.dr.NextValue()
// the indicator of the read end, instead of InsertData with 0 rows. if err == io.EOF {
if insertData.GetRowNum() == 0 {
return nil, io.EOF return nil, io.EOF
} }
for fieldID, binlogs := range r.insertLogs { break
field := typeutil.GetField(r.schema, fieldID)
if field == nil {
return nil, merr.WrapErrFieldNotFound(fieldID)
} }
path := binlogs[r.readIdx]
fr, err := newFieldReader(r.ctx, r.cm, field, path)
if err != nil { if err != nil {
return nil, err return nil, err
} }
fieldData, err := fr.Next() // convert record to fieldData
for _, field := range r.schema.Fields {
fieldData := insertData.Data[field.GetFieldID()]
if fieldData == nil {
fieldData, err = storage.NewFieldData(field.GetDataType(), field, 1024)
if err != nil { if err != nil {
fr.Close()
return nil, err return nil, err
} }
fr.Close()
insertData.Data[field.GetFieldID()] = fieldData insertData.Data[field.GetFieldID()] = fieldData
} }
err := fieldData.AppendRow((*v).Value.(map[int64]any)[field.GetFieldID()])
if err != nil {
return nil, err
}
}
}
insertData, err = r.filter(insertData) insertData, err = r.filter(insertData)
if err != nil { if err != nil {
return nil, err return nil, err
} }
r.readIdx++
return insertData, nil return insertData, nil
} }

View File

@ -225,6 +225,18 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data
} }
schema := &schemapb.CollectionSchema{ schema := &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{ Fields: []*schemapb.FieldSchema{
{
FieldID: int64(common.RowIDField),
Name: common.RowIDFieldName,
IsPrimaryKey: false,
DataType: schemapb.DataType_Int64,
},
{
FieldID: int64(common.TimeStampField),
Name: common.TimeStampFieldName,
IsPrimaryKey: false,
DataType: schemapb.DataType_Int64,
},
{ {
FieldID: 100, FieldID: 100,
Name: "pk", Name: "pk",
@ -252,7 +264,6 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data
}, },
} }
cm := mocks.NewChunkManager(suite.T()) cm := mocks.NewChunkManager(suite.T())
schema = typeutil.AppendSystemFields(schema)
originalInsertData, err := testutil.CreateInsertData(schema, suite.numRows) originalInsertData, err := testutil.CreateInsertData(schema, suite.numRows)
suite.NoError(err) suite.NoError(err)
@ -276,12 +287,15 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data
} }
return nil return nil
}) })
for fieldID, paths := range insertBinlogs { var (
field := typeutil.GetField(schema, fieldID) paths = make([]string, 0)
suite.NotNil(field) bytes = make([][]byte, 0)
buf0 := createBinlogBuf(suite.T(), field, originalInsertData.Data[fieldID]) )
cm.EXPECT().Read(mock.Anything, paths[0]).Return(buf0, nil) for _, field := range schema.Fields {
paths = append(paths, insertBinlogs[field.GetFieldID()][0])
bytes = append(bytes, createBinlogBuf(suite.T(), field, originalInsertData.Data[field.GetFieldID()]))
} }
cm.EXPECT().MultiRead(mock.Anything, paths).Return(bytes, nil)
if len(suite.deletePKs) != 0 { if len(suite.deletePKs) != 0 {
for _, path := range deltaLogs { for _, path := range deltaLogs {