diff --git a/.golangci.yml b/.golangci.yml index 79b823753e..8c708d4312 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,18 +1,5 @@ run: - go: "1.21" - skip-dirs: - - build - - configs - - deployments - - docs - - scripts - - internal/core - - cmake_build - - mmap - - data - - ci - skip-files: - - partial_search_test.go + go: "1.22" build-tags: - dynamic - test @@ -51,7 +38,6 @@ linters-settings: enable: # add extra linters - nilness gofumpt: - lang-version: "1.18" module-path: github.com/milvus-io goimports: local-prefixes: github.com/milvus-io @@ -142,6 +128,19 @@ linters-settings: #- 'fmt\.Print.*' WIP issues: + exclude-dirs: + - build + - configs + - deployments + - docs + - scripts + - internal/core + - cmake_build + - mmap + - data + - ci + exclude-files: + - partial_search_test.go exclude-use-default: false exclude-rules: - path: .+_test\.go @@ -176,6 +175,31 @@ issues: - SA1019 # defer return errors - SA5001 + # TODO: cleanup following exclusions, added on golangci-lint upgrade + - sloppyLen + - dupSubExpr + - assignOp + - ifElseChain + - elseif + - commentFormatting + - var-naming + - exitAfterDefer + - captLocal + - singleCaseSwitch + - typeSwitchVar + - indent-error-flow + - appendAssign + - deprecatedComment + - SA9009 + - SA1006 + - S1009 + - unlambda + - dupCase + - dupArg + - offBy1 + - unslice + # Integer overflow conversion + - G115 # Maximum issues count per one linter. Set to 0 to disable. Default is 50. max-issues-per-linter: 0 diff --git a/Makefile b/Makefile index cdb58df8ee..2710bc97dd 100644 --- a/Makefile +++ b/Makefile @@ -45,7 +45,7 @@ ifdef USE_OPENDAL use_opendal = ${USE_OPENDAL} endif # golangci-lint -GOLANGCI_LINT_VERSION := 1.55.2 +GOLANGCI_LINT_VERSION := 1.64.7 GOLANGCI_LINT_OUTPUT := $(shell $(INSTALL_PATH)/golangci-lint --version 2>/dev/null) INSTALL_GOLANGCI_LINT := $(findstring $(GOLANGCI_LINT_VERSION), $(GOLANGCI_LINT_OUTPUT)) # mockery diff --git a/client/.golangci.yml b/client/.golangci.yml index 8b90a9f55a..d482018027 100644 --- a/client/.golangci.yml +++ b/client/.golangci.yml @@ -1,15 +1,5 @@ run: - go: "1.21" - skip-dirs: - - build - - configs - - deployments - - docs - - scripts - - internal/core - - cmake_build - skip-files: - - partial_search_test.go + go: "1.22" linters: disable-all: true @@ -42,7 +32,6 @@ linters-settings: - prefix(github.com/milvus-io) custom-order: true gofumpt: - lang-version: "1.18" module-path: github.com/milvus-io goimports: local-prefixes: github.com/milvus-io @@ -129,6 +118,16 @@ linters-settings: #- 'fmt\.Print.*' WIP issues: + exclude-dirs: + - build + - configs + - deployments + - docs + - scripts + - internal/core + - cmake_build + exclude-files: + - partial_search_test.go exclude-use-default: false exclude-rules: - path: .+_test\.go @@ -161,6 +160,31 @@ issues: - SA1019 # defer return errors - SA5001 + # TODO: cleanup following exclusions, added on golangci-lint upgrade + - sloppyLen + - dupSubExpr + - assignOp + - ifElseChain + - elseif + - commentFormatting + - var-naming + - exitAfterDefer + - captLocal + - singleCaseSwitch + - typeSwitchVar + - indent-error-flow + - appendAssign + - deprecatedComment + - SA9009 + - SA1006 + - S1009 + - unlambda + - dupCase + - dupArg + - offBy1 + - unslice + # Integer overflow conversion + - G115 # Maximum issues count per one linter. Set to 0 to disable. Default is 50. max-issues-per-linter: 0 diff --git a/internal/datanode/compactor/merge_sort.go b/internal/datanode/compactor/merge_sort.go index b25756ed57..841551d16d 100644 --- a/internal/datanode/compactor/merge_sort.go +++ b/internal/datanode/compactor/merge_sort.go @@ -53,28 +53,11 @@ func mergeSortMultipleSegments(ctx context.Context, segmentReaders := make([]storage.RecordReader, len(binlogs)) segmentFilters := make([]compaction.EntityFilter, len(binlogs)) for i, s := range binlogs { - var binlogBatchCount int - for _, b := range s.GetFieldBinlogs() { - if b != nil { - binlogBatchCount = len(b.GetBinlogs()) - break - } + reader, err := storage.NewBinlogRecordReader(ctx, s.GetFieldBinlogs(), plan.GetSchema(), storage.WithDownloader(binlogIO.Download)) + if err != nil { + return nil, err } - - if binlogBatchCount == 0 { - log.Warn("compacting empty segment", zap.Int64("segmentID", s.GetSegmentID())) - continue - } - - binlogPaths := make([][]string, binlogBatchCount) - for idx := 0; idx < binlogBatchCount; idx++ { - var batchPaths []string - for _, f := range s.GetFieldBinlogs() { - batchPaths = append(batchPaths, f.GetBinlogs()[idx].GetLogPath()) - } - binlogPaths[idx] = batchPaths - } - segmentReaders[i] = NewSegmentRecordReader(ctx, binlogPaths, binlogIO) + segmentReaders[i] = reader deltalogPaths := make([]string, 0) for _, d := range s.GetDeltalogs() { for _, l := range d.GetBinlogs() { diff --git a/internal/datanode/compactor/mix_compactor_test.go b/internal/datanode/compactor/mix_compactor_test.go index d4a1fd72e6..54d7615d71 100644 --- a/internal/datanode/compactor/mix_compactor_test.go +++ b/internal/datanode/compactor/mix_compactor_test.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/errors" "github.com/samber/lo" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -346,7 +347,7 @@ func (s *MixCompactionTaskSuite) TestCompactSortedSegmentLackBinlog() { deleteTs := tsoutil.ComposeTSByTime(getMilvusBirthday().Add(10*time.Second), 0) addedFieldSet := typeutil.NewSet[int64]() for _, f := range s.meta.GetSchema().GetFields() { - if f.FieldID == common.RowIDField || f.FieldID == common.TimeStampField || f.IsPrimaryKey || typeutil.IsVectorType(f.DataType) { + if !f.Nullable { continue } addedFieldSet.Insert(f.FieldID) @@ -454,7 +455,6 @@ func (s *MixCompactionTaskSuite) TestSplitMergeEntityExpired() { } func (s *MixCompactionTaskSuite) TestMergeNoExpirationLackBinlog() { - s.T().Skip() // Skip added field related tests for now. s.initSegBuffer(1, 4) deleteTs := tsoutil.ComposeTSByTime(getMilvusBirthday().Add(10*time.Second), 0) tests := []struct { @@ -471,18 +471,16 @@ func (s *MixCompactionTaskSuite) TestMergeNoExpirationLackBinlog() { alloc := allocator.NewLocalAllocator(888888, math.MaxInt64) addedFieldSet := typeutil.NewSet[int64]() for _, f := range s.meta.GetSchema().GetFields() { - if f.FieldID == common.RowIDField || f.FieldID == common.TimeStampField || f.IsPrimaryKey || typeutil.IsVectorType(f.DataType) { + if !f.Nullable { continue } addedFieldSet.Insert(f.FieldID) } + assert.NotEmpty(s.T(), addedFieldSet) kvs, fBinlogs, err := serializeWrite(context.TODO(), alloc, s.segWriter) for fid, binlog := range fBinlogs { if addedFieldSet.Contain(fid) { - if rand.Intn(2) == 0 { - continue - } for _, k := range binlog.GetBinlogs() { delete(kvs, k.LogPath) } @@ -996,6 +994,7 @@ func genTestCollectionMeta() *etcdpb.CollectionMeta { FieldID: StringField, Name: "field_string", DataType: schemapb.DataType_String, + Nullable: true, }, { FieldID: VarCharField, diff --git a/internal/datanode/compactor/segment_record_reader.go b/internal/datanode/compactor/segment_record_reader.go deleted file mode 100644 index 916013f10c..0000000000 --- a/internal/datanode/compactor/segment_record_reader.go +++ /dev/null @@ -1,31 +0,0 @@ -package compactor - -import ( - "context" - "io" - - "github.com/samber/lo" - - binlogIO "github.com/milvus-io/milvus/internal/flushcommon/io" - "github.com/milvus-io/milvus/internal/storage" -) - -func NewSegmentRecordReader(ctx context.Context, binlogPaths [][]string, binlogIO binlogIO.BinlogIO) storage.RecordReader { - pos := 0 - return &storage.CompositeBinlogRecordReader{ - BlobsReader: func() ([]*storage.Blob, error) { - if pos >= len(binlogPaths) { - return nil, io.EOF - } - bytesArr, err := binlogIO.Download(ctx, binlogPaths[pos]) - if err != nil { - return nil, err - } - blobs := lo.Map(bytesArr, func(v []byte, i int) *storage.Blob { - return &storage.Blob{Key: binlogPaths[pos][i], Value: v} - }) - pos++ - return blobs, nil - }, - } -} diff --git a/internal/datanode/compactor/segment_writer.go b/internal/datanode/compactor/segment_writer.go index acf9b2ccb1..ca1cab2dfc 100644 --- a/internal/datanode/compactor/segment_writer.go +++ b/internal/datanode/compactor/segment_writer.go @@ -20,11 +20,8 @@ import ( "context" "fmt" "math" - "strconv" - "github.com/apache/arrow/go/v17/arrow" "github.com/apache/arrow/go/v17/arrow/array" - "github.com/apache/arrow/go/v17/arrow/memory" "github.com/samber/lo" "go.uber.org/atomic" "go.uber.org/zap" @@ -369,44 +366,7 @@ func (w *SegmentWriter) WriteRecord(r storage.Record) error { w.rowCount.Inc() } - - builders := make([]array.Builder, len(w.sch.Fields)) - for i, f := range w.sch.Fields { - var b array.Builder - if r.Column(f.FieldID) == nil { - b = array.NewBuilder(memory.DefaultAllocator, storage.MilvusDataTypeToArrowType(f.GetDataType(), 1)) - } else { - b = array.NewBuilder(memory.DefaultAllocator, r.Column(f.FieldID).DataType()) - } - builders[i] = b - } - for c, builder := range builders { - fid := w.sch.Fields[c].FieldID - defaultValue := w.sch.Fields[c].GetDefaultValue() - for i := 0; i < rows; i++ { - if err := storage.AppendValueAt(builder, r.Column(fid), i, defaultValue); err != nil { - return err - } - } - } - arrays := make([]arrow.Array, len(builders)) - fields := make([]arrow.Field, len(builders)) - field2Col := make(map[typeutil.UniqueID]int, len(builders)) - - for c, builder := range builders { - arrays[c] = builder.NewArray() - fid := w.sch.Fields[c].FieldID - fields[c] = arrow.Field{ - Name: strconv.Itoa(int(fid)), - Type: arrays[c].DataType(), - Nullable: true, // No nullable check here. - } - field2Col[fid] = c - } - - rec := storage.NewSimpleArrowRecord(array.NewRecord(arrow.NewSchema(fields, nil), arrays, int64(rows)), field2Col) - defer rec.Release() - return w.writer.Write(rec) + return w.writer.Write(r) } func (w *SegmentWriter) Write(v *storage.Value) error { diff --git a/internal/storage/rw.go b/internal/storage/rw.go index c5dc2421cf..4e1b16ac3e 100644 --- a/internal/storage/rw.go +++ b/internal/storage/rw.go @@ -20,6 +20,7 @@ import ( "context" "fmt" sio "io" + "sort" "github.com/samber/lo" @@ -36,11 +37,16 @@ const ( StorageV2 int64 = 2 ) +type ( + downloaderFn func(ctx context.Context, paths []string) ([][]byte, error) + uploaderFn func(ctx context.Context, kvs map[string][]byte) error +) + type rwOptions struct { version int64 bufferSize int64 - downloader func(ctx context.Context, paths []string) ([][]byte, error) - uploader func(ctx context.Context, kvs map[string][]byte) error + downloader downloaderFn + uploader uploaderFn multiPartUploadSize int64 columnGroups []storagecommon.ColumnGroup } @@ -90,6 +96,77 @@ func WithColumnGroups(columnGroups []storagecommon.ColumnGroup) RwOption { } } +func makeBlobsReader(ctx context.Context, binlogs []*datapb.FieldBinlog, downloader downloaderFn) (ChunkedBlobsReader, error) { + if len(binlogs) == 0 { + return func() ([]*Blob, error) { + return nil, sio.EOF + }, nil + } + sort.Slice(binlogs, func(i, j int) bool { + return binlogs[i].FieldID < binlogs[j].FieldID + }) + for _, binlog := range binlogs { + sort.Slice(binlog.Binlogs, func(i, j int) bool { + return binlog.Binlogs[i].LogID < binlog.Binlogs[j].LogID + }) + } + nChunks := len(binlogs[0].Binlogs) + chunks := make([][]string, nChunks) // i is chunkid, j is fieldid + missingChunks := lo.Map(binlogs, func(binlog *datapb.FieldBinlog, _ int) int { + return nChunks - len(binlog.Binlogs) + }) + for i := range nChunks { + chunks[i] = make([]string, 0, len(binlogs)) + for j, binlog := range binlogs { + if i >= missingChunks[j] { + idx := i - missingChunks[j] + chunks[i] = append(chunks[i], binlog.Binlogs[idx].LogPath) + } + } + } + // verify if the chunks order is correct. + // the zig-zag order should have a (strict) increasing order on logids. + // lastLogID := int64(-1) + // for _, paths := range chunks { + // lastFieldID := int64(-1) + // for _, path := range paths { + // _, _, _, fieldID, logID, ok := metautil.ParseInsertLogPath(path) + // if !ok { + // return nil, merr.WrapErrIoFailedReason(fmt.Sprintf("malformed log path %s", path)) + // } + // if fieldID < lastFieldID { + // return nil, merr.WrapErrIoFailedReason(fmt.Sprintf("unaligned log path %s, fieldID %d less than lastFieldID %d", path, fieldID, lastFieldID)) + // } + // if logID < lastLogID { + // return nil, merr.WrapErrIoFailedReason(fmt.Sprintf("unaligned log path %s, logID %d less than lastLogID %d", path, logID, lastLogID)) + // } + // lastLogID = logID + // lastFieldID = fieldID + // } + // } + + chunkPos := 0 + return func() ([]*Blob, error) { + if chunkPos >= nChunks { + return nil, sio.EOF + } + + vals, err := downloader(ctx, chunks[chunkPos]) + if err != nil { + return nil, err + } + blobs := make([]*Blob, 0, len(vals)) + for i := range vals { + blobs = append(blobs, &Blob{ + Key: chunks[chunkPos][i], + Value: vals[i], + }) + } + chunkPos++ + return blobs, nil + }, nil +} + func NewBinlogRecordReader(ctx context.Context, binlogs []*datapb.FieldBinlog, schema *schemapb.CollectionSchema, option ...RwOption) (RecordReader, error) { rwOptions := DefaultRwOptions() for _, opt := range option { @@ -97,28 +174,11 @@ func NewBinlogRecordReader(ctx context.Context, binlogs []*datapb.FieldBinlog, s } switch rwOptions.version { case StorageV1: - itr := 0 - return newCompositeBinlogRecordReader(schema, func() ([]*Blob, error) { - if len(binlogs) <= 0 { - return nil, sio.EOF - } - paths := make([]string, len(binlogs)) - for i, fieldBinlog := range binlogs { - if itr >= len(fieldBinlog.GetBinlogs()) { - return nil, sio.EOF - } - paths[i] = fieldBinlog.GetBinlogs()[itr].GetLogPath() - } - itr++ - values, err := rwOptions.downloader(ctx, paths) - if err != nil { - return nil, err - } - blobs := lo.Map(values, func(v []byte, i int) *Blob { - return &Blob{Key: paths[i], Value: v} - }) - return blobs, nil - }) + blobsReader, err := makeBlobsReader(ctx, binlogs, rwOptions.downloader) + if err != nil { + return nil, err + } + return newCompositeBinlogRecordReader(schema, blobsReader) case StorageV2: if len(binlogs) <= 0 { return nil, sio.EOF diff --git a/internal/storage/rw_test.go b/internal/storage/rw_test.go index 5544032f00..417b297d67 100644 --- a/internal/storage/rw_test.go +++ b/internal/storage/rw_test.go @@ -26,6 +26,7 @@ import ( "time" "github.com/samber/lo" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" "google.golang.org/grpc" @@ -377,3 +378,165 @@ func genCollectionSchemaWithBM25() *schemapb.CollectionSchema { func getMilvusBirthday() time.Time { return time.Date(2019, time.Month(5), 30, 0, 0, 0, 0, time.UTC) } + +func Test_makeBlobsReader(t *testing.T) { + ctx := context.Background() + downloader := func(ctx context.Context, paths []string) ([][]byte, error) { + return lo.Map(paths, func(item string, index int) []byte { + return []byte{} + }), nil + } + + tests := []struct { + name string + binlogs []*datapb.FieldBinlog + want [][]*Blob + wantErr bool + }{ + { + name: "test full", + binlogs: []*datapb.FieldBinlog{ + { + FieldID: 100, + Binlogs: []*datapb.Binlog{ + {LogPath: "x/1/1/1/100/1"}, + }, + }, + { + FieldID: 101, + Binlogs: []*datapb.Binlog{ + {LogPath: "x/1/1/1/101/2"}, + }, + }, + { + FieldID: 102, + Binlogs: []*datapb.Binlog{ + {LogPath: "x/1/1/1/102/3"}, + }, + }, + }, + want: [][]*Blob{ + { + { + Key: "x/1/1/1/100/1", + Value: []byte{}, + }, + { + Key: "x/1/1/1/101/2", + Value: []byte{}, + }, + { + Key: "x/1/1/1/102/3", + Value: []byte{}, + }, + }, + }, + wantErr: false, + }, + + { + name: "test added field", + binlogs: []*datapb.FieldBinlog{ + { + FieldID: 100, + Binlogs: []*datapb.Binlog{ + {LogPath: "x/1/1/1/100/1"}, + {LogPath: "x/1/1/1/100/3"}, + }, + }, + { + FieldID: 101, + Binlogs: []*datapb.Binlog{ + {LogPath: "x/1/1/1/101/2"}, + {LogPath: "x/1/1/1/101/4"}, + }, + }, + { + FieldID: 102, + Binlogs: []*datapb.Binlog{ + {LogPath: "x/1/1/1/102/5"}, + }, + }, + }, + want: [][]*Blob{ + { + { + Key: "x/1/1/1/100/1", + Value: []byte{}, + }, + { + Key: "x/1/1/1/101/2", + Value: []byte{}, + }, + }, + { + { + Key: "x/1/1/1/100/3", + Value: []byte{}, + }, + { + Key: "x/1/1/1/101/4", + Value: []byte{}, + }, + { + Key: "x/1/1/1/102/5", + Value: []byte{}, + }, + }, + }, + wantErr: false, + }, + + // { + // name: "test error", + // binlogs: []*datapb.FieldBinlog{ + // { + // FieldID: 100, + // Binlogs: []*datapb.Binlog{ + // {LogPath: "x/1/1/1/100/1"}, + // {LogPath: "x/1/1/1/100/3"}, + // }, + // }, + // { + // FieldID: 101, + // Binlogs: []*datapb.Binlog{ + // {LogPath: "x/1/1/1/101/2"}, + // {LogPath: "x/1/1/1/101/4"}, + // }, + // }, + // { + // FieldID: 102, + // Binlogs: []*datapb.Binlog{ + // {LogPath: "x/1/1/1/102/5"}, + // {LogPath: "x/1/1/1/102/6"}, + // }, + // }, + // }, + // want: nil, + // wantErr: true, + // }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + reader, err := makeBlobsReader(ctx, tt.binlogs, downloader) + if err != nil { + if !tt.wantErr { + t.Errorf("makeBlobsReader() error = %v, wantErr %v", err, tt.wantErr) + } + return + } + got := make([][]*Blob, 0) + for { + bs, err := reader() + if err == io.EOF { + break + } + if err != nil { + assert.Fail(t, err.Error()) + } + got = append(got, bs) + } + assert.Equal(t, tt.want, got) + }) + } +} diff --git a/internal/storage/serde_events.go b/internal/storage/serde_events.go index e6c2798884..d5777bafaf 100644 --- a/internal/storage/serde_events.go +++ b/internal/storage/serde_events.go @@ -49,21 +49,26 @@ type ChunkedBlobsReader func() ([]*Blob, error) type CompositeBinlogRecordReader struct { BlobsReader ChunkedBlobsReader + schema *schemapb.CollectionSchema + index map[FieldID]int16 brs []*BinlogReader rrs []array.RecordReader - - schema *schemapb.CollectionSchema - index map[FieldID]int16 } func (crr *CompositeBinlogRecordReader) iterateNextBatch() error { if crr.brs != nil { for _, er := range crr.brs { - er.Close() + if er != nil { + er.Close() + } } + } + if crr.rrs != nil { for _, rr := range crr.rrs { - rr.Release() + if rr != nil { + rr.Release() + } } } @@ -72,13 +77,10 @@ func (crr *CompositeBinlogRecordReader) iterateNextBatch() error { return err } - if crr.rrs == nil { - crr.rrs = make([]array.RecordReader, len(blobs)) - crr.brs = make([]*BinlogReader, len(blobs)) - crr.index = make(map[FieldID]int16, len(blobs)) - } + crr.rrs = make([]array.RecordReader, len(crr.schema.Fields)) + crr.brs = make([]*BinlogReader, len(crr.schema.Fields)) - for i, b := range blobs { + for _, b := range blobs { reader, err := NewBinlogReader(b.Value) if err != nil { return err @@ -88,12 +90,12 @@ func (crr *CompositeBinlogRecordReader) iterateNextBatch() error { if err != nil { return err } + i := crr.index[reader.FieldID] rr, err := er.GetArrowRecordReader() if err != nil { return err } crr.rrs[i] = rr - crr.index[reader.FieldID] = int16(i) crr.brs[i] = reader } return nil @@ -106,36 +108,45 @@ func (crr *CompositeBinlogRecordReader) Next() (Record, error) { } } - composeRecord := func() (Record, bool) { - recs := make([]arrow.Array, len(crr.rrs)) - for i, rr := range crr.rrs { - if ok := rr.Next(); !ok { - return nil, false + composeRecord := func() (Record, error) { + recs := make([]arrow.Array, len(crr.schema.Fields)) + + for i, f := range crr.schema.Fields { + if crr.rrs[i] != nil { + if ok := crr.rrs[i].Next(); !ok { + return nil, io.EOF + } + recs[i] = crr.rrs[i].Record().Column(0) + } else { + // If the field is not in the current batch, fill with null array + // Note that we're intentionally not filling default value here, because the + // deserializer will fill them later. + if !f.Nullable { + return nil, merr.WrapErrServiceInternal(fmt.Sprintf("missing field data %s", f.Name)) + } + dim, _ := typeutil.GetDim(f) + builder := array.NewBuilder(memory.DefaultAllocator, serdeMap[f.DataType].arrowType(int(dim))) + builder.AppendNulls(int(crr.rrs[0].Record().NumRows())) + recs[i] = builder.NewArray() } - recs[i] = rr.Record().Column(0) } return &compositeRecord{ index: crr.index, recs: recs, - }, true + }, nil } // Try compose records - var ( - r Record - ok bool - ) - r, ok = composeRecord() - if !ok { - // If failed the first time, try iterate next batch (blob), the error may be io.EOF + r, err := composeRecord() + if err == io.EOF { + // if EOF, try iterate next batch (blob) if err := crr.iterateNextBatch(); err != nil { return nil, err } - // If iterate next batch success, try compose again - if r, ok = composeRecord(); !ok { - // If the next blob is empty, return io.EOF (it's rare). - return nil, io.EOF - } + r, err = composeRecord() // try compose again + } + if err != nil { + return nil, err } return r, nil } @@ -170,43 +181,45 @@ func parseBlobKey(blobKey string) (colId FieldID, logId UniqueID) { } func MakeBlobsReader(blobs []*Blob) ChunkedBlobsReader { - blobMap := make(map[FieldID][]*Blob) - for _, blob := range blobs { - colId, _ := parseBlobKey(blob.Key) - if _, exists := blobMap[colId]; !exists { - blobMap[colId] = []*Blob{blob} - } else { - blobMap[colId] = append(blobMap[colId], blob) + // sort blobs by log id + sort.Slice(blobs, func(i, j int) bool { + _, iLog := parseBlobKey(blobs[i].Key) + _, jLog := parseBlobKey(blobs[j].Key) + return iLog < jLog + }) + var field0 FieldID + pivots := make([]int, 0) + for i, blob := range blobs { + if i == 0 { + field0, _ = parseBlobKey(blob.Key) + pivots = append(pivots, 0) + continue + } + if fieldID, _ := parseBlobKey(blob.Key); fieldID == field0 { + pivots = append(pivots, i) } } - sortedBlobs := make([][]*Blob, 0, len(blobMap)) - for _, blobsForField := range blobMap { - sort.Slice(blobsForField, func(i, j int) bool { - _, iLog := parseBlobKey(blobsForField[i].Key) - _, jLog := parseBlobKey(blobsForField[j].Key) - - return iLog < jLog - }) - sortedBlobs = append(sortedBlobs, blobsForField) - } + pivots = append(pivots, len(blobs)) // append a pivot to the end of the slice chunkPos := 0 return func() ([]*Blob, error) { - if len(sortedBlobs) == 0 || chunkPos >= len(sortedBlobs[0]) { + if chunkPos >= len(pivots)-1 { return nil, io.EOF } - blobs := make([]*Blob, len(sortedBlobs)) - for fieldPos := range blobs { - blobs[fieldPos] = sortedBlobs[fieldPos][chunkPos] - } + chunk := blobs[pivots[chunkPos]:pivots[chunkPos+1]] chunkPos++ - return blobs, nil + return chunk, nil } } func newCompositeBinlogRecordReader(schema *schemapb.CollectionSchema, blobsReader ChunkedBlobsReader) (*CompositeBinlogRecordReader, error) { + index := make(map[FieldID]int16) + for i, f := range schema.Fields { + index[f.FieldID] = int16(i) + } return &CompositeBinlogRecordReader{ schema: schema, BlobsReader: blobsReader, + index: index, }, nil } @@ -235,16 +248,12 @@ func ValueDeserializer(r Record, v []*Value, fieldSchema []*schemapb.FieldSchema for _, f := range fieldSchema { j := f.FieldID dt := f.DataType - if r.Column(j) == nil { + if r.Column(j).IsNull(i) { if f.GetDefaultValue() != nil { m[j] = getDefaultValue(f) } else { m[j] = nil } - continue - } - if r.Column(j).IsNull(i) { - m[j] = nil } else { d, ok := serdeMap[dt].deserialize(r.Column(j), i) if ok { @@ -286,7 +295,15 @@ func NewBinlogDeserializeReader(schema *schemapb.CollectionSchema, blobsReader C } func newDeltalogOneFieldReader(blobs []*Blob) (*DeserializeReaderImpl[*DeleteLog], error) { - reader, err := newCompositeBinlogRecordReader(nil, MakeBlobsReader(blobs)) + reader, err := newCompositeBinlogRecordReader( + &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + DataType: schemapb.DataType_VarChar, + }, + }, + }, + MakeBlobsReader(blobs)) if err != nil { return nil, err } @@ -923,7 +940,7 @@ func newDeltalogSerializeWriter(eventWriter *DeltalogStreamWriter, batchSize int } rws[0] = rw compositeRecordWriter := NewCompositeRecordWriter(rws) - return NewSerializeRecordWriter[*DeleteLog](compositeRecordWriter, func(v []*DeleteLog) (Record, error) { + return NewSerializeRecordWriter(compositeRecordWriter, func(v []*DeleteLog) (Record, error) { builder := array.NewBuilder(memory.DefaultAllocator, arrow.BinaryTypes.String) for _, vv := range v { diff --git a/internal/storage/serde_events_test.go b/internal/storage/serde_events_test.go index 3257eb823c..babb37cab0 100644 --- a/internal/storage/serde_events_test.go +++ b/internal/storage/serde_events_test.go @@ -32,6 +32,7 @@ import ( "github.com/apache/arrow/go/v17/arrow/memory" "github.com/apache/arrow/go/v17/parquet/file" "github.com/apache/arrow/go/v17/parquet/pqarrow" + "github.com/samber/lo" "github.com/stretchr/testify/assert" "go.uber.org/zap" @@ -44,7 +45,7 @@ import ( func TestBinlogDeserializeReader(t *testing.T) { t.Run("test empty data", func(t *testing.T) { - reader, err := NewBinlogDeserializeReader(nil, func() ([]*Blob, error) { + reader, err := NewBinlogDeserializeReader(generateTestSchema(), func() ([]*Blob, error) { return nil, io.EOF }) assert.NoError(t, err) @@ -184,7 +185,7 @@ func TestBinlogSerializeWriter(t *testing.T) { func TestBinlogValueWriter(t *testing.T) { t.Run("test empty data", func(t *testing.T) { - reader, err := NewBinlogDeserializeReader(nil, func() ([]*Blob, error) { + reader, err := NewBinlogDeserializeReader(generateTestSchema(), func() ([]*Blob, error) { return nil, io.EOF }) assert.NoError(t, err) @@ -708,3 +709,108 @@ func readDeltaLog(size int, blob *Blob) error { } return nil } + +func TestMakeBlobsReader(t *testing.T) { + type args struct { + blobs []string + } + tests := []struct { + name string + args args + want [][]string + }{ + { + name: "test empty", + args: args{ + blobs: nil, + }, + want: nil, + }, + { + name: "test aligned", + args: args{ + blobs: []string{ + "x/1/1/1/1/1", + "x/1/1/1/2/2", + "x/1/1/1/3/3", + "x/1/1/1/1/4", + "x/1/1/1/2/5", + "x/1/1/1/3/6", + "x/1/1/1/1/7", + "x/1/1/1/2/8", + "x/1/1/1/3/9", + }, + }, + want: [][]string{ + {"x/1/1/1/1/1", "x/1/1/1/2/2", "x/1/1/1/3/3"}, + {"x/1/1/1/1/4", "x/1/1/1/2/5", "x/1/1/1/3/6"}, + {"x/1/1/1/1/7", "x/1/1/1/2/8", "x/1/1/1/3/9"}, + }, + }, + { + name: "test added field", + args: args{ + blobs: []string{ + "x/1/1/1/1/1", + "x/1/1/1/2/2", + "x/1/1/1/1/3", + "x/1/1/1/2/4", + "x/1/1/1/1/5", + "x/1/1/1/2/6", + "x/1/1/1/3/7", + }, + }, + + want: [][]string{ + {"x/1/1/1/1/1", "x/1/1/1/2/2"}, + {"x/1/1/1/1/3", "x/1/1/1/2/4"}, + {"x/1/1/1/1/5", "x/1/1/1/2/6", "x/1/1/1/3/7"}, + }, + }, + { + name: "test if there is a hole", + args: args{ + blobs: []string{ + "x/1/1/1/1/1", + "x/1/1/1/2/2", + "x/1/1/1/3/3", + "x/1/1/1/1/4", + "x/1/1/1/2/5", + "x/1/1/1/1/6", + "x/1/1/1/2/7", + "x/1/1/1/3/8", + }, + }, + + want: [][]string{ + {"x/1/1/1/1/1", "x/1/1/1/2/2", "x/1/1/1/3/3"}, + {"x/1/1/1/1/4", "x/1/1/1/2/5"}, + {"x/1/1/1/1/6", "x/1/1/1/2/7", "x/1/1/1/3/8"}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + blobs := lo.Map(tt.args.blobs, func(item string, index int) *Blob { + return &Blob{ + Key: item, + } + }) + reader := MakeBlobsReader(blobs) + got := make([][]string, 0) + for { + bs, err := reader() + if err == io.EOF { + break + } + if err != nil { + assert.Fail(t, err.Error()) + } + got = append(got, lo.Map(bs, func(item *Blob, index int) string { + return item.Key + })) + } + assert.ElementsMatch(t, tt.want, got) + }) + } +} diff --git a/tests/go_client/.golangci.yml b/tests/go_client/.golangci.yml index 8b90a9f55a..d482018027 100644 --- a/tests/go_client/.golangci.yml +++ b/tests/go_client/.golangci.yml @@ -1,15 +1,5 @@ run: - go: "1.21" - skip-dirs: - - build - - configs - - deployments - - docs - - scripts - - internal/core - - cmake_build - skip-files: - - partial_search_test.go + go: "1.22" linters: disable-all: true @@ -42,7 +32,6 @@ linters-settings: - prefix(github.com/milvus-io) custom-order: true gofumpt: - lang-version: "1.18" module-path: github.com/milvus-io goimports: local-prefixes: github.com/milvus-io @@ -129,6 +118,16 @@ linters-settings: #- 'fmt\.Print.*' WIP issues: + exclude-dirs: + - build + - configs + - deployments + - docs + - scripts + - internal/core + - cmake_build + exclude-files: + - partial_search_test.go exclude-use-default: false exclude-rules: - path: .+_test\.go @@ -161,6 +160,31 @@ issues: - SA1019 # defer return errors - SA5001 + # TODO: cleanup following exclusions, added on golangci-lint upgrade + - sloppyLen + - dupSubExpr + - assignOp + - ifElseChain + - elseif + - commentFormatting + - var-naming + - exitAfterDefer + - captLocal + - singleCaseSwitch + - typeSwitchVar + - indent-error-flow + - appendAssign + - deprecatedComment + - SA9009 + - SA1006 + - S1009 + - unlambda + - dupCase + - dupArg + - offBy1 + - unslice + # Integer overflow conversion + - G115 # Maximum issues count per one linter. Set to 0 to disable. Default is 50. max-issues-per-linter: 0