feat: compaction to support add field (#40415)

See: #39718

---------

Signed-off-by: Ted Xu <ted.xu@zilliz.com>
This commit is contained in:
Ted Xu 2025-03-18 11:32:12 +08:00 committed by GitHub
parent f4f81b7d0c
commit be86d31ea3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 555 additions and 226 deletions

View File

@ -1,18 +1,5 @@
run:
go: "1.21"
skip-dirs:
- build
- configs
- deployments
- docs
- scripts
- internal/core
- cmake_build
- mmap
- data
- ci
skip-files:
- partial_search_test.go
go: "1.22"
build-tags:
- dynamic
- test
@ -51,7 +38,6 @@ linters-settings:
enable: # add extra linters
- nilness
gofumpt:
lang-version: "1.18"
module-path: github.com/milvus-io
goimports:
local-prefixes: github.com/milvus-io
@ -142,6 +128,19 @@ linters-settings:
#- 'fmt\.Print.*' WIP
issues:
exclude-dirs:
- build
- configs
- deployments
- docs
- scripts
- internal/core
- cmake_build
- mmap
- data
- ci
exclude-files:
- partial_search_test.go
exclude-use-default: false
exclude-rules:
- path: .+_test\.go
@ -176,6 +175,31 @@ issues:
- SA1019
# defer return errors
- SA5001
# TODO: cleanup following exclusions, added on golangci-lint upgrade
- sloppyLen
- dupSubExpr
- assignOp
- ifElseChain
- elseif
- commentFormatting
- var-naming
- exitAfterDefer
- captLocal
- singleCaseSwitch
- typeSwitchVar
- indent-error-flow
- appendAssign
- deprecatedComment
- SA9009
- SA1006
- S1009
- unlambda
- dupCase
- dupArg
- offBy1
- unslice
# Integer overflow conversion
- G115
# Maximum issues count per one linter. Set to 0 to disable. Default is 50.
max-issues-per-linter: 0

View File

@ -45,7 +45,7 @@ ifdef USE_OPENDAL
use_opendal = ${USE_OPENDAL}
endif
# golangci-lint
GOLANGCI_LINT_VERSION := 1.55.2
GOLANGCI_LINT_VERSION := 1.64.7
GOLANGCI_LINT_OUTPUT := $(shell $(INSTALL_PATH)/golangci-lint --version 2>/dev/null)
INSTALL_GOLANGCI_LINT := $(findstring $(GOLANGCI_LINT_VERSION), $(GOLANGCI_LINT_OUTPUT))
# mockery

View File

@ -1,15 +1,5 @@
run:
go: "1.21"
skip-dirs:
- build
- configs
- deployments
- docs
- scripts
- internal/core
- cmake_build
skip-files:
- partial_search_test.go
go: "1.22"
linters:
disable-all: true
@ -42,7 +32,6 @@ linters-settings:
- prefix(github.com/milvus-io)
custom-order: true
gofumpt:
lang-version: "1.18"
module-path: github.com/milvus-io
goimports:
local-prefixes: github.com/milvus-io
@ -129,6 +118,16 @@ linters-settings:
#- 'fmt\.Print.*' WIP
issues:
exclude-dirs:
- build
- configs
- deployments
- docs
- scripts
- internal/core
- cmake_build
exclude-files:
- partial_search_test.go
exclude-use-default: false
exclude-rules:
- path: .+_test\.go
@ -161,6 +160,31 @@ issues:
- SA1019
# defer return errors
- SA5001
# TODO: cleanup following exclusions, added on golangci-lint upgrade
- sloppyLen
- dupSubExpr
- assignOp
- ifElseChain
- elseif
- commentFormatting
- var-naming
- exitAfterDefer
- captLocal
- singleCaseSwitch
- typeSwitchVar
- indent-error-flow
- appendAssign
- deprecatedComment
- SA9009
- SA1006
- S1009
- unlambda
- dupCase
- dupArg
- offBy1
- unslice
# Integer overflow conversion
- G115
# Maximum issues count per one linter. Set to 0 to disable. Default is 50.
max-issues-per-linter: 0

View File

@ -53,28 +53,11 @@ func mergeSortMultipleSegments(ctx context.Context,
segmentReaders := make([]storage.RecordReader, len(binlogs))
segmentFilters := make([]compaction.EntityFilter, len(binlogs))
for i, s := range binlogs {
var binlogBatchCount int
for _, b := range s.GetFieldBinlogs() {
if b != nil {
binlogBatchCount = len(b.GetBinlogs())
break
}
reader, err := storage.NewBinlogRecordReader(ctx, s.GetFieldBinlogs(), plan.GetSchema(), storage.WithDownloader(binlogIO.Download))
if err != nil {
return nil, err
}
if binlogBatchCount == 0 {
log.Warn("compacting empty segment", zap.Int64("segmentID", s.GetSegmentID()))
continue
}
binlogPaths := make([][]string, binlogBatchCount)
for idx := 0; idx < binlogBatchCount; idx++ {
var batchPaths []string
for _, f := range s.GetFieldBinlogs() {
batchPaths = append(batchPaths, f.GetBinlogs()[idx].GetLogPath())
}
binlogPaths[idx] = batchPaths
}
segmentReaders[i] = NewSegmentRecordReader(ctx, binlogPaths, binlogIO)
segmentReaders[i] = reader
deltalogPaths := make([]string, 0)
for _, d := range s.GetDeltalogs() {
for _, l := range d.GetBinlogs() {

View File

@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
@ -346,7 +347,7 @@ func (s *MixCompactionTaskSuite) TestCompactSortedSegmentLackBinlog() {
deleteTs := tsoutil.ComposeTSByTime(getMilvusBirthday().Add(10*time.Second), 0)
addedFieldSet := typeutil.NewSet[int64]()
for _, f := range s.meta.GetSchema().GetFields() {
if f.FieldID == common.RowIDField || f.FieldID == common.TimeStampField || f.IsPrimaryKey || typeutil.IsVectorType(f.DataType) {
if !f.Nullable {
continue
}
addedFieldSet.Insert(f.FieldID)
@ -454,7 +455,6 @@ func (s *MixCompactionTaskSuite) TestSplitMergeEntityExpired() {
}
func (s *MixCompactionTaskSuite) TestMergeNoExpirationLackBinlog() {
s.T().Skip() // Skip added field related tests for now.
s.initSegBuffer(1, 4)
deleteTs := tsoutil.ComposeTSByTime(getMilvusBirthday().Add(10*time.Second), 0)
tests := []struct {
@ -471,18 +471,16 @@ func (s *MixCompactionTaskSuite) TestMergeNoExpirationLackBinlog() {
alloc := allocator.NewLocalAllocator(888888, math.MaxInt64)
addedFieldSet := typeutil.NewSet[int64]()
for _, f := range s.meta.GetSchema().GetFields() {
if f.FieldID == common.RowIDField || f.FieldID == common.TimeStampField || f.IsPrimaryKey || typeutil.IsVectorType(f.DataType) {
if !f.Nullable {
continue
}
addedFieldSet.Insert(f.FieldID)
}
assert.NotEmpty(s.T(), addedFieldSet)
kvs, fBinlogs, err := serializeWrite(context.TODO(), alloc, s.segWriter)
for fid, binlog := range fBinlogs {
if addedFieldSet.Contain(fid) {
if rand.Intn(2) == 0 {
continue
}
for _, k := range binlog.GetBinlogs() {
delete(kvs, k.LogPath)
}
@ -996,6 +994,7 @@ func genTestCollectionMeta() *etcdpb.CollectionMeta {
FieldID: StringField,
Name: "field_string",
DataType: schemapb.DataType_String,
Nullable: true,
},
{
FieldID: VarCharField,

View File

@ -1,31 +0,0 @@
package compactor
import (
"context"
"io"
"github.com/samber/lo"
binlogIO "github.com/milvus-io/milvus/internal/flushcommon/io"
"github.com/milvus-io/milvus/internal/storage"
)
func NewSegmentRecordReader(ctx context.Context, binlogPaths [][]string, binlogIO binlogIO.BinlogIO) storage.RecordReader {
pos := 0
return &storage.CompositeBinlogRecordReader{
BlobsReader: func() ([]*storage.Blob, error) {
if pos >= len(binlogPaths) {
return nil, io.EOF
}
bytesArr, err := binlogIO.Download(ctx, binlogPaths[pos])
if err != nil {
return nil, err
}
blobs := lo.Map(bytesArr, func(v []byte, i int) *storage.Blob {
return &storage.Blob{Key: binlogPaths[pos][i], Value: v}
})
pos++
return blobs, nil
},
}
}

View File

@ -20,11 +20,8 @@ import (
"context"
"fmt"
"math"
"strconv"
"github.com/apache/arrow/go/v17/arrow"
"github.com/apache/arrow/go/v17/arrow/array"
"github.com/apache/arrow/go/v17/arrow/memory"
"github.com/samber/lo"
"go.uber.org/atomic"
"go.uber.org/zap"
@ -369,44 +366,7 @@ func (w *SegmentWriter) WriteRecord(r storage.Record) error {
w.rowCount.Inc()
}
builders := make([]array.Builder, len(w.sch.Fields))
for i, f := range w.sch.Fields {
var b array.Builder
if r.Column(f.FieldID) == nil {
b = array.NewBuilder(memory.DefaultAllocator, storage.MilvusDataTypeToArrowType(f.GetDataType(), 1))
} else {
b = array.NewBuilder(memory.DefaultAllocator, r.Column(f.FieldID).DataType())
}
builders[i] = b
}
for c, builder := range builders {
fid := w.sch.Fields[c].FieldID
defaultValue := w.sch.Fields[c].GetDefaultValue()
for i := 0; i < rows; i++ {
if err := storage.AppendValueAt(builder, r.Column(fid), i, defaultValue); err != nil {
return err
}
}
}
arrays := make([]arrow.Array, len(builders))
fields := make([]arrow.Field, len(builders))
field2Col := make(map[typeutil.UniqueID]int, len(builders))
for c, builder := range builders {
arrays[c] = builder.NewArray()
fid := w.sch.Fields[c].FieldID
fields[c] = arrow.Field{
Name: strconv.Itoa(int(fid)),
Type: arrays[c].DataType(),
Nullable: true, // No nullable check here.
}
field2Col[fid] = c
}
rec := storage.NewSimpleArrowRecord(array.NewRecord(arrow.NewSchema(fields, nil), arrays, int64(rows)), field2Col)
defer rec.Release()
return w.writer.Write(rec)
return w.writer.Write(r)
}
func (w *SegmentWriter) Write(v *storage.Value) error {

View File

@ -20,6 +20,7 @@ import (
"context"
"fmt"
sio "io"
"sort"
"github.com/samber/lo"
@ -36,11 +37,16 @@ const (
StorageV2 int64 = 2
)
type (
downloaderFn func(ctx context.Context, paths []string) ([][]byte, error)
uploaderFn func(ctx context.Context, kvs map[string][]byte) error
)
type rwOptions struct {
version int64
bufferSize int64
downloader func(ctx context.Context, paths []string) ([][]byte, error)
uploader func(ctx context.Context, kvs map[string][]byte) error
downloader downloaderFn
uploader uploaderFn
multiPartUploadSize int64
columnGroups []storagecommon.ColumnGroup
}
@ -90,6 +96,77 @@ func WithColumnGroups(columnGroups []storagecommon.ColumnGroup) RwOption {
}
}
func makeBlobsReader(ctx context.Context, binlogs []*datapb.FieldBinlog, downloader downloaderFn) (ChunkedBlobsReader, error) {
if len(binlogs) == 0 {
return func() ([]*Blob, error) {
return nil, sio.EOF
}, nil
}
sort.Slice(binlogs, func(i, j int) bool {
return binlogs[i].FieldID < binlogs[j].FieldID
})
for _, binlog := range binlogs {
sort.Slice(binlog.Binlogs, func(i, j int) bool {
return binlog.Binlogs[i].LogID < binlog.Binlogs[j].LogID
})
}
nChunks := len(binlogs[0].Binlogs)
chunks := make([][]string, nChunks) // i is chunkid, j is fieldid
missingChunks := lo.Map(binlogs, func(binlog *datapb.FieldBinlog, _ int) int {
return nChunks - len(binlog.Binlogs)
})
for i := range nChunks {
chunks[i] = make([]string, 0, len(binlogs))
for j, binlog := range binlogs {
if i >= missingChunks[j] {
idx := i - missingChunks[j]
chunks[i] = append(chunks[i], binlog.Binlogs[idx].LogPath)
}
}
}
// verify if the chunks order is correct.
// the zig-zag order should have a (strict) increasing order on logids.
// lastLogID := int64(-1)
// for _, paths := range chunks {
// lastFieldID := int64(-1)
// for _, path := range paths {
// _, _, _, fieldID, logID, ok := metautil.ParseInsertLogPath(path)
// if !ok {
// return nil, merr.WrapErrIoFailedReason(fmt.Sprintf("malformed log path %s", path))
// }
// if fieldID < lastFieldID {
// return nil, merr.WrapErrIoFailedReason(fmt.Sprintf("unaligned log path %s, fieldID %d less than lastFieldID %d", path, fieldID, lastFieldID))
// }
// if logID < lastLogID {
// return nil, merr.WrapErrIoFailedReason(fmt.Sprintf("unaligned log path %s, logID %d less than lastLogID %d", path, logID, lastLogID))
// }
// lastLogID = logID
// lastFieldID = fieldID
// }
// }
chunkPos := 0
return func() ([]*Blob, error) {
if chunkPos >= nChunks {
return nil, sio.EOF
}
vals, err := downloader(ctx, chunks[chunkPos])
if err != nil {
return nil, err
}
blobs := make([]*Blob, 0, len(vals))
for i := range vals {
blobs = append(blobs, &Blob{
Key: chunks[chunkPos][i],
Value: vals[i],
})
}
chunkPos++
return blobs, nil
}, nil
}
func NewBinlogRecordReader(ctx context.Context, binlogs []*datapb.FieldBinlog, schema *schemapb.CollectionSchema, option ...RwOption) (RecordReader, error) {
rwOptions := DefaultRwOptions()
for _, opt := range option {
@ -97,28 +174,11 @@ func NewBinlogRecordReader(ctx context.Context, binlogs []*datapb.FieldBinlog, s
}
switch rwOptions.version {
case StorageV1:
itr := 0
return newCompositeBinlogRecordReader(schema, func() ([]*Blob, error) {
if len(binlogs) <= 0 {
return nil, sio.EOF
}
paths := make([]string, len(binlogs))
for i, fieldBinlog := range binlogs {
if itr >= len(fieldBinlog.GetBinlogs()) {
return nil, sio.EOF
}
paths[i] = fieldBinlog.GetBinlogs()[itr].GetLogPath()
}
itr++
values, err := rwOptions.downloader(ctx, paths)
if err != nil {
return nil, err
}
blobs := lo.Map(values, func(v []byte, i int) *Blob {
return &Blob{Key: paths[i], Value: v}
})
return blobs, nil
})
blobsReader, err := makeBlobsReader(ctx, binlogs, rwOptions.downloader)
if err != nil {
return nil, err
}
return newCompositeBinlogRecordReader(schema, blobsReader)
case StorageV2:
if len(binlogs) <= 0 {
return nil, sio.EOF

View File

@ -26,6 +26,7 @@ import (
"time"
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"google.golang.org/grpc"
@ -377,3 +378,165 @@ func genCollectionSchemaWithBM25() *schemapb.CollectionSchema {
func getMilvusBirthday() time.Time {
return time.Date(2019, time.Month(5), 30, 0, 0, 0, 0, time.UTC)
}
func Test_makeBlobsReader(t *testing.T) {
ctx := context.Background()
downloader := func(ctx context.Context, paths []string) ([][]byte, error) {
return lo.Map(paths, func(item string, index int) []byte {
return []byte{}
}), nil
}
tests := []struct {
name string
binlogs []*datapb.FieldBinlog
want [][]*Blob
wantErr bool
}{
{
name: "test full",
binlogs: []*datapb.FieldBinlog{
{
FieldID: 100,
Binlogs: []*datapb.Binlog{
{LogPath: "x/1/1/1/100/1"},
},
},
{
FieldID: 101,
Binlogs: []*datapb.Binlog{
{LogPath: "x/1/1/1/101/2"},
},
},
{
FieldID: 102,
Binlogs: []*datapb.Binlog{
{LogPath: "x/1/1/1/102/3"},
},
},
},
want: [][]*Blob{
{
{
Key: "x/1/1/1/100/1",
Value: []byte{},
},
{
Key: "x/1/1/1/101/2",
Value: []byte{},
},
{
Key: "x/1/1/1/102/3",
Value: []byte{},
},
},
},
wantErr: false,
},
{
name: "test added field",
binlogs: []*datapb.FieldBinlog{
{
FieldID: 100,
Binlogs: []*datapb.Binlog{
{LogPath: "x/1/1/1/100/1"},
{LogPath: "x/1/1/1/100/3"},
},
},
{
FieldID: 101,
Binlogs: []*datapb.Binlog{
{LogPath: "x/1/1/1/101/2"},
{LogPath: "x/1/1/1/101/4"},
},
},
{
FieldID: 102,
Binlogs: []*datapb.Binlog{
{LogPath: "x/1/1/1/102/5"},
},
},
},
want: [][]*Blob{
{
{
Key: "x/1/1/1/100/1",
Value: []byte{},
},
{
Key: "x/1/1/1/101/2",
Value: []byte{},
},
},
{
{
Key: "x/1/1/1/100/3",
Value: []byte{},
},
{
Key: "x/1/1/1/101/4",
Value: []byte{},
},
{
Key: "x/1/1/1/102/5",
Value: []byte{},
},
},
},
wantErr: false,
},
// {
// name: "test error",
// binlogs: []*datapb.FieldBinlog{
// {
// FieldID: 100,
// Binlogs: []*datapb.Binlog{
// {LogPath: "x/1/1/1/100/1"},
// {LogPath: "x/1/1/1/100/3"},
// },
// },
// {
// FieldID: 101,
// Binlogs: []*datapb.Binlog{
// {LogPath: "x/1/1/1/101/2"},
// {LogPath: "x/1/1/1/101/4"},
// },
// },
// {
// FieldID: 102,
// Binlogs: []*datapb.Binlog{
// {LogPath: "x/1/1/1/102/5"},
// {LogPath: "x/1/1/1/102/6"},
// },
// },
// },
// want: nil,
// wantErr: true,
// },
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
reader, err := makeBlobsReader(ctx, tt.binlogs, downloader)
if err != nil {
if !tt.wantErr {
t.Errorf("makeBlobsReader() error = %v, wantErr %v", err, tt.wantErr)
}
return
}
got := make([][]*Blob, 0)
for {
bs, err := reader()
if err == io.EOF {
break
}
if err != nil {
assert.Fail(t, err.Error())
}
got = append(got, bs)
}
assert.Equal(t, tt.want, got)
})
}
}

View File

@ -49,21 +49,26 @@ type ChunkedBlobsReader func() ([]*Blob, error)
type CompositeBinlogRecordReader struct {
BlobsReader ChunkedBlobsReader
schema *schemapb.CollectionSchema
index map[FieldID]int16
brs []*BinlogReader
rrs []array.RecordReader
schema *schemapb.CollectionSchema
index map[FieldID]int16
}
func (crr *CompositeBinlogRecordReader) iterateNextBatch() error {
if crr.brs != nil {
for _, er := range crr.brs {
er.Close()
if er != nil {
er.Close()
}
}
}
if crr.rrs != nil {
for _, rr := range crr.rrs {
rr.Release()
if rr != nil {
rr.Release()
}
}
}
@ -72,13 +77,10 @@ func (crr *CompositeBinlogRecordReader) iterateNextBatch() error {
return err
}
if crr.rrs == nil {
crr.rrs = make([]array.RecordReader, len(blobs))
crr.brs = make([]*BinlogReader, len(blobs))
crr.index = make(map[FieldID]int16, len(blobs))
}
crr.rrs = make([]array.RecordReader, len(crr.schema.Fields))
crr.brs = make([]*BinlogReader, len(crr.schema.Fields))
for i, b := range blobs {
for _, b := range blobs {
reader, err := NewBinlogReader(b.Value)
if err != nil {
return err
@ -88,12 +90,12 @@ func (crr *CompositeBinlogRecordReader) iterateNextBatch() error {
if err != nil {
return err
}
i := crr.index[reader.FieldID]
rr, err := er.GetArrowRecordReader()
if err != nil {
return err
}
crr.rrs[i] = rr
crr.index[reader.FieldID] = int16(i)
crr.brs[i] = reader
}
return nil
@ -106,36 +108,45 @@ func (crr *CompositeBinlogRecordReader) Next() (Record, error) {
}
}
composeRecord := func() (Record, bool) {
recs := make([]arrow.Array, len(crr.rrs))
for i, rr := range crr.rrs {
if ok := rr.Next(); !ok {
return nil, false
composeRecord := func() (Record, error) {
recs := make([]arrow.Array, len(crr.schema.Fields))
for i, f := range crr.schema.Fields {
if crr.rrs[i] != nil {
if ok := crr.rrs[i].Next(); !ok {
return nil, io.EOF
}
recs[i] = crr.rrs[i].Record().Column(0)
} else {
// If the field is not in the current batch, fill with null array
// Note that we're intentionally not filling default value here, because the
// deserializer will fill them later.
if !f.Nullable {
return nil, merr.WrapErrServiceInternal(fmt.Sprintf("missing field data %s", f.Name))
}
dim, _ := typeutil.GetDim(f)
builder := array.NewBuilder(memory.DefaultAllocator, serdeMap[f.DataType].arrowType(int(dim)))
builder.AppendNulls(int(crr.rrs[0].Record().NumRows()))
recs[i] = builder.NewArray()
}
recs[i] = rr.Record().Column(0)
}
return &compositeRecord{
index: crr.index,
recs: recs,
}, true
}, nil
}
// Try compose records
var (
r Record
ok bool
)
r, ok = composeRecord()
if !ok {
// If failed the first time, try iterate next batch (blob), the error may be io.EOF
r, err := composeRecord()
if err == io.EOF {
// if EOF, try iterate next batch (blob)
if err := crr.iterateNextBatch(); err != nil {
return nil, err
}
// If iterate next batch success, try compose again
if r, ok = composeRecord(); !ok {
// If the next blob is empty, return io.EOF (it's rare).
return nil, io.EOF
}
r, err = composeRecord() // try compose again
}
if err != nil {
return nil, err
}
return r, nil
}
@ -170,43 +181,45 @@ func parseBlobKey(blobKey string) (colId FieldID, logId UniqueID) {
}
func MakeBlobsReader(blobs []*Blob) ChunkedBlobsReader {
blobMap := make(map[FieldID][]*Blob)
for _, blob := range blobs {
colId, _ := parseBlobKey(blob.Key)
if _, exists := blobMap[colId]; !exists {
blobMap[colId] = []*Blob{blob}
} else {
blobMap[colId] = append(blobMap[colId], blob)
// sort blobs by log id
sort.Slice(blobs, func(i, j int) bool {
_, iLog := parseBlobKey(blobs[i].Key)
_, jLog := parseBlobKey(blobs[j].Key)
return iLog < jLog
})
var field0 FieldID
pivots := make([]int, 0)
for i, blob := range blobs {
if i == 0 {
field0, _ = parseBlobKey(blob.Key)
pivots = append(pivots, 0)
continue
}
if fieldID, _ := parseBlobKey(blob.Key); fieldID == field0 {
pivots = append(pivots, i)
}
}
sortedBlobs := make([][]*Blob, 0, len(blobMap))
for _, blobsForField := range blobMap {
sort.Slice(blobsForField, func(i, j int) bool {
_, iLog := parseBlobKey(blobsForField[i].Key)
_, jLog := parseBlobKey(blobsForField[j].Key)
return iLog < jLog
})
sortedBlobs = append(sortedBlobs, blobsForField)
}
pivots = append(pivots, len(blobs)) // append a pivot to the end of the slice
chunkPos := 0
return func() ([]*Blob, error) {
if len(sortedBlobs) == 0 || chunkPos >= len(sortedBlobs[0]) {
if chunkPos >= len(pivots)-1 {
return nil, io.EOF
}
blobs := make([]*Blob, len(sortedBlobs))
for fieldPos := range blobs {
blobs[fieldPos] = sortedBlobs[fieldPos][chunkPos]
}
chunk := blobs[pivots[chunkPos]:pivots[chunkPos+1]]
chunkPos++
return blobs, nil
return chunk, nil
}
}
func newCompositeBinlogRecordReader(schema *schemapb.CollectionSchema, blobsReader ChunkedBlobsReader) (*CompositeBinlogRecordReader, error) {
index := make(map[FieldID]int16)
for i, f := range schema.Fields {
index[f.FieldID] = int16(i)
}
return &CompositeBinlogRecordReader{
schema: schema,
BlobsReader: blobsReader,
index: index,
}, nil
}
@ -235,16 +248,12 @@ func ValueDeserializer(r Record, v []*Value, fieldSchema []*schemapb.FieldSchema
for _, f := range fieldSchema {
j := f.FieldID
dt := f.DataType
if r.Column(j) == nil {
if r.Column(j).IsNull(i) {
if f.GetDefaultValue() != nil {
m[j] = getDefaultValue(f)
} else {
m[j] = nil
}
continue
}
if r.Column(j).IsNull(i) {
m[j] = nil
} else {
d, ok := serdeMap[dt].deserialize(r.Column(j), i)
if ok {
@ -286,7 +295,15 @@ func NewBinlogDeserializeReader(schema *schemapb.CollectionSchema, blobsReader C
}
func newDeltalogOneFieldReader(blobs []*Blob) (*DeserializeReaderImpl[*DeleteLog], error) {
reader, err := newCompositeBinlogRecordReader(nil, MakeBlobsReader(blobs))
reader, err := newCompositeBinlogRecordReader(
&schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
DataType: schemapb.DataType_VarChar,
},
},
},
MakeBlobsReader(blobs))
if err != nil {
return nil, err
}
@ -923,7 +940,7 @@ func newDeltalogSerializeWriter(eventWriter *DeltalogStreamWriter, batchSize int
}
rws[0] = rw
compositeRecordWriter := NewCompositeRecordWriter(rws)
return NewSerializeRecordWriter[*DeleteLog](compositeRecordWriter, func(v []*DeleteLog) (Record, error) {
return NewSerializeRecordWriter(compositeRecordWriter, func(v []*DeleteLog) (Record, error) {
builder := array.NewBuilder(memory.DefaultAllocator, arrow.BinaryTypes.String)
for _, vv := range v {

View File

@ -32,6 +32,7 @@ import (
"github.com/apache/arrow/go/v17/arrow/memory"
"github.com/apache/arrow/go/v17/parquet/file"
"github.com/apache/arrow/go/v17/parquet/pqarrow"
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
@ -44,7 +45,7 @@ import (
func TestBinlogDeserializeReader(t *testing.T) {
t.Run("test empty data", func(t *testing.T) {
reader, err := NewBinlogDeserializeReader(nil, func() ([]*Blob, error) {
reader, err := NewBinlogDeserializeReader(generateTestSchema(), func() ([]*Blob, error) {
return nil, io.EOF
})
assert.NoError(t, err)
@ -184,7 +185,7 @@ func TestBinlogSerializeWriter(t *testing.T) {
func TestBinlogValueWriter(t *testing.T) {
t.Run("test empty data", func(t *testing.T) {
reader, err := NewBinlogDeserializeReader(nil, func() ([]*Blob, error) {
reader, err := NewBinlogDeserializeReader(generateTestSchema(), func() ([]*Blob, error) {
return nil, io.EOF
})
assert.NoError(t, err)
@ -708,3 +709,108 @@ func readDeltaLog(size int, blob *Blob) error {
}
return nil
}
func TestMakeBlobsReader(t *testing.T) {
type args struct {
blobs []string
}
tests := []struct {
name string
args args
want [][]string
}{
{
name: "test empty",
args: args{
blobs: nil,
},
want: nil,
},
{
name: "test aligned",
args: args{
blobs: []string{
"x/1/1/1/1/1",
"x/1/1/1/2/2",
"x/1/1/1/3/3",
"x/1/1/1/1/4",
"x/1/1/1/2/5",
"x/1/1/1/3/6",
"x/1/1/1/1/7",
"x/1/1/1/2/8",
"x/1/1/1/3/9",
},
},
want: [][]string{
{"x/1/1/1/1/1", "x/1/1/1/2/2", "x/1/1/1/3/3"},
{"x/1/1/1/1/4", "x/1/1/1/2/5", "x/1/1/1/3/6"},
{"x/1/1/1/1/7", "x/1/1/1/2/8", "x/1/1/1/3/9"},
},
},
{
name: "test added field",
args: args{
blobs: []string{
"x/1/1/1/1/1",
"x/1/1/1/2/2",
"x/1/1/1/1/3",
"x/1/1/1/2/4",
"x/1/1/1/1/5",
"x/1/1/1/2/6",
"x/1/1/1/3/7",
},
},
want: [][]string{
{"x/1/1/1/1/1", "x/1/1/1/2/2"},
{"x/1/1/1/1/3", "x/1/1/1/2/4"},
{"x/1/1/1/1/5", "x/1/1/1/2/6", "x/1/1/1/3/7"},
},
},
{
name: "test if there is a hole",
args: args{
blobs: []string{
"x/1/1/1/1/1",
"x/1/1/1/2/2",
"x/1/1/1/3/3",
"x/1/1/1/1/4",
"x/1/1/1/2/5",
"x/1/1/1/1/6",
"x/1/1/1/2/7",
"x/1/1/1/3/8",
},
},
want: [][]string{
{"x/1/1/1/1/1", "x/1/1/1/2/2", "x/1/1/1/3/3"},
{"x/1/1/1/1/4", "x/1/1/1/2/5"},
{"x/1/1/1/1/6", "x/1/1/1/2/7", "x/1/1/1/3/8"},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
blobs := lo.Map(tt.args.blobs, func(item string, index int) *Blob {
return &Blob{
Key: item,
}
})
reader := MakeBlobsReader(blobs)
got := make([][]string, 0)
for {
bs, err := reader()
if err == io.EOF {
break
}
if err != nil {
assert.Fail(t, err.Error())
}
got = append(got, lo.Map(bs, func(item *Blob, index int) string {
return item.Key
}))
}
assert.ElementsMatch(t, tt.want, got)
})
}
}

View File

@ -1,15 +1,5 @@
run:
go: "1.21"
skip-dirs:
- build
- configs
- deployments
- docs
- scripts
- internal/core
- cmake_build
skip-files:
- partial_search_test.go
go: "1.22"
linters:
disable-all: true
@ -42,7 +32,6 @@ linters-settings:
- prefix(github.com/milvus-io)
custom-order: true
gofumpt:
lang-version: "1.18"
module-path: github.com/milvus-io
goimports:
local-prefixes: github.com/milvus-io
@ -129,6 +118,16 @@ linters-settings:
#- 'fmt\.Print.*' WIP
issues:
exclude-dirs:
- build
- configs
- deployments
- docs
- scripts
- internal/core
- cmake_build
exclude-files:
- partial_search_test.go
exclude-use-default: false
exclude-rules:
- path: .+_test\.go
@ -161,6 +160,31 @@ issues:
- SA1019
# defer return errors
- SA5001
# TODO: cleanup following exclusions, added on golangci-lint upgrade
- sloppyLen
- dupSubExpr
- assignOp
- ifElseChain
- elseif
- commentFormatting
- var-naming
- exitAfterDefer
- captLocal
- singleCaseSwitch
- typeSwitchVar
- indent-error-flow
- appendAssign
- deprecatedComment
- SA9009
- SA1006
- S1009
- unlambda
- dupCase
- dupArg
- offBy1
- unslice
# Integer overflow conversion
- G115
# Maximum issues count per one linter. Set to 0 to disable. Default is 50.
max-issues-per-linter: 0