From de02a3ebcc9ca8ba191605e583d4dc2ef6a4ee4f Mon Sep 17 00:00:00 2001 From: sthuang <167743503+shaoting-huang@users.noreply.github.com> Date: Mon, 3 Mar 2025 10:24:02 +0800 Subject: [PATCH] feat: Storage v2 binlog packed record reader and writer (#40221) related: #39173 --------- Signed-off-by: shaoting-huang --- Makefile | 2 +- .../compaction/clustering_compactor_test.go | 10 +- .../datanode/compaction/l0_compactor_test.go | 6 +- .../datanode/compaction/mix_compactor_test.go | 8 +- internal/indexnode/task_stats_test.go | 6 +- .../flushcommon/mock_util}/mock_binlogio.go | 2 +- internal/storage/rw.go | 61 ++- internal/storage/rw_test.go | 381 +++++++++++++++ internal/storage/serde_events_v2.go | 447 ++++++++++++++++-- internal/storage/serde_events_v2_test.go | 66 +-- .../storagecommon/column_group_splitter.go | 49 ++ .../column_group_splitter_test.go | 127 +++++ internal/storagev2/packed/packed_reader.go | 3 +- internal/storagev2/packed/packed_test.go | 12 +- internal/storagev2/packed/packed_writer.go | 12 +- 15 files changed, 1079 insertions(+), 113 deletions(-) rename internal/{flushcommon/io => mocks/flushcommon/mock_util}/mock_binlogio.go (99%) create mode 100644 internal/storage/rw_test.go create mode 100644 internal/storagecommon/column_group_splitter.go create mode 100644 internal/storagecommon/column_group_splitter_test.go diff --git a/Makefile b/Makefile index 5e44a7d959..7fab81fb1d 100644 --- a/Makefile +++ b/Makefile @@ -486,7 +486,7 @@ generate-mockery-flushcommon: getdeps $(INSTALL_PATH)/mockery --name=Task --dir=$(PWD)/internal/flushcommon/syncmgr --output=$(PWD)/internal/flushcommon/syncmgr --filename=mock_task.go --with-expecter --structname=MockTask --outpkg=syncmgr --inpackage $(INSTALL_PATH)/mockery --name=WriteBuffer --dir=$(PWD)/internal/flushcommon/writebuffer --output=$(PWD)/internal/flushcommon/writebuffer --filename=mock_write_buffer.go --with-expecter --structname=MockWriteBuffer --outpkg=writebuffer --inpackage $(INSTALL_PATH)/mockery --name=BufferManager --dir=$(PWD)/internal/flushcommon/writebuffer --output=$(PWD)/internal/flushcommon/writebuffer --filename=mock_manager.go --with-expecter --structname=MockBufferManager --outpkg=writebuffer --inpackage - $(INSTALL_PATH)/mockery --name=BinlogIO --dir=$(PWD)/internal/flushcommon/io --output=$(PWD)/internal/flushcommon/io --filename=mock_binlogio.go --with-expecter --structname=MockBinlogIO --outpkg=io --inpackage + $(INSTALL_PATH)/mockery --name=BinlogIO --dir=$(PWD)/internal/flushcommon/io --output=$(PWD)/internal/mocks/flushcommon/mock_util --filename=mock_binlogio.go --with-expecter --structname=MockBinlogIO --outpkg=mock_util --inpackage=false $(INSTALL_PATH)/mockery --name=FlowgraphManager --dir=$(PWD)/internal/flushcommon/pipeline --output=$(PWD)/internal/flushcommon/pipeline --filename=mock_fgmanager.go --with-expecter --structname=MockFlowgraphManager --outpkg=pipeline --inpackage generate-mockery-metastore: getdeps diff --git a/internal/datanode/compaction/clustering_compactor_test.go b/internal/datanode/compaction/clustering_compactor_test.go index 6fd5f89bf8..64b0fab328 100644 --- a/internal/datanode/compaction/clustering_compactor_test.go +++ b/internal/datanode/compaction/clustering_compactor_test.go @@ -32,7 +32,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/datanode/allocator" - "github.com/milvus-io/milvus/internal/flushcommon/io" + "github.com/milvus-io/milvus/internal/mocks/flushcommon/mock_util" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" @@ -49,7 +49,7 @@ func TestClusteringCompactionTaskSuite(t *testing.T) { type ClusteringCompactionTaskSuite struct { suite.Suite - mockBinlogIO *io.MockBinlogIO + mockBinlogIO *mock_util.MockBinlogIO mockAlloc *allocator.MockAllocator mockID atomic.Int64 @@ -63,7 +63,7 @@ func (s *ClusteringCompactionTaskSuite) SetupSuite() { } func (s *ClusteringCompactionTaskSuite) SetupTest() { - s.mockBinlogIO = io.NewMockBinlogIO(s.T()) + s.mockBinlogIO = mock_util.NewMockBinlogIO(s.T()) s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil).Maybe() @@ -490,7 +490,7 @@ func (s *ClusteringCompactionTaskSuite) TestGenerateBM25Stats() { s.Run("upload failed", func() { segmentID := int64(1) - mockBinlogIO := io.NewMockBinlogIO(s.T()) + mockBinlogIO := mock_util.NewMockBinlogIO(s.T()) mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(fmt.Errorf("mock error")).Once() task := &clusteringCompactionTask{ @@ -566,7 +566,7 @@ func (s *ClusteringCompactionTaskSuite) TestGeneratePkStats() { kvs, _, err := serializeWrite(context.TODO(), s.mockAlloc, segWriter) s.NoError(err) - mockBinlogIO := io.NewMockBinlogIO(s.T()) + mockBinlogIO := mock_util.NewMockBinlogIO(s.T()) mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return(lo.Values(kvs), nil) mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(fmt.Errorf("mock error")) task := &clusteringCompactionTask{ diff --git a/internal/datanode/compaction/l0_compactor_test.go b/internal/datanode/compaction/l0_compactor_test.go index afcf2f429c..83bbb7c561 100644 --- a/internal/datanode/compaction/l0_compactor_test.go +++ b/internal/datanode/compaction/l0_compactor_test.go @@ -28,9 +28,9 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/datanode/allocator" - "github.com/milvus-io/milvus/internal/flushcommon/io" "github.com/milvus-io/milvus/internal/flushcommon/metacache/pkoracle" "github.com/milvus-io/milvus/internal/mocks" + "github.com/milvus-io/milvus/internal/mocks/flushcommon/mock_util" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/log" @@ -47,7 +47,7 @@ func TestLevelZeroCompactionTaskSuite(t *testing.T) { type LevelZeroCompactionTaskSuite struct { suite.Suite - mockBinlogIO *io.MockBinlogIO + mockBinlogIO *mock_util.MockBinlogIO task *LevelZeroCompactionTask dData *storage.DeleteData @@ -56,7 +56,7 @@ type LevelZeroCompactionTaskSuite struct { func (s *LevelZeroCompactionTaskSuite) SetupTest() { paramtable.Init() - s.mockBinlogIO = io.NewMockBinlogIO(s.T()) + s.mockBinlogIO = mock_util.NewMockBinlogIO(s.T()) // plan of the task is unset s.task = NewLevelZeroCompactionTask(context.Background(), s.mockBinlogIO, nil, nil) diff --git a/internal/datanode/compaction/mix_compactor_test.go b/internal/datanode/compaction/mix_compactor_test.go index 0ed8a2fa29..25535c1808 100644 --- a/internal/datanode/compaction/mix_compactor_test.go +++ b/internal/datanode/compaction/mix_compactor_test.go @@ -32,9 +32,9 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/allocator" - "github.com/milvus-io/milvus/internal/flushcommon/io" "github.com/milvus-io/milvus/internal/flushcommon/metacache" "github.com/milvus-io/milvus/internal/flushcommon/metacache/pkoracle" + "github.com/milvus-io/milvus/internal/mocks/flushcommon/mock_util" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" @@ -51,7 +51,7 @@ func TestMixCompactionTaskSuite(t *testing.T) { type MixCompactionTaskSuite struct { suite.Suite - mockBinlogIO *io.MockBinlogIO + mockBinlogIO *mock_util.MockBinlogIO meta *etcdpb.CollectionMeta segWriter *SegmentWriter @@ -64,7 +64,7 @@ func (s *MixCompactionTaskSuite) SetupSuite() { } func (s *MixCompactionTaskSuite) SetupTest() { - s.mockBinlogIO = io.NewMockBinlogIO(s.T()) + s.mockBinlogIO = mock_util.NewMockBinlogIO(s.T()) s.meta = genTestCollectionMeta() @@ -90,7 +90,7 @@ func (s *MixCompactionTaskSuite) SetupTest() { } func (s *MixCompactionTaskSuite) SetupBM25() { - s.mockBinlogIO = io.NewMockBinlogIO(s.T()) + s.mockBinlogIO = mock_util.NewMockBinlogIO(s.T()) s.meta = genTestCollectionMetaWithBM25() plan := &datapb.CompactionPlan{ diff --git a/internal/indexnode/task_stats_test.go b/internal/indexnode/task_stats_test.go index 4e199905cd..1aaa5ed7d1 100644 --- a/internal/indexnode/task_stats_test.go +++ b/internal/indexnode/task_stats_test.go @@ -29,7 +29,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/datanode/compaction" - "github.com/milvus-io/milvus/internal/flushcommon/io" + "github.com/milvus-io/milvus/internal/mocks/flushcommon/mock_util" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/proto/indexpb" @@ -51,7 +51,7 @@ type TaskStatsSuite struct { clusterID string schema *schemapb.CollectionSchema - mockBinlogIO *io.MockBinlogIO + mockBinlogIO *mock_util.MockBinlogIO segWriter *compaction.SegmentWriter } @@ -63,7 +63,7 @@ func (s *TaskStatsSuite) SetupSuite() { func (s *TaskStatsSuite) SetupSubTest() { paramtable.Init() - s.mockBinlogIO = io.NewMockBinlogIO(s.T()) + s.mockBinlogIO = mock_util.NewMockBinlogIO(s.T()) } func (s *TaskStatsSuite) GenSegmentWriterWithBM25(magic int64) { diff --git a/internal/flushcommon/io/mock_binlogio.go b/internal/mocks/flushcommon/mock_util/mock_binlogio.go similarity index 99% rename from internal/flushcommon/io/mock_binlogio.go rename to internal/mocks/flushcommon/mock_util/mock_binlogio.go index 011ae78967..5505f86805 100644 --- a/internal/flushcommon/io/mock_binlogio.go +++ b/internal/mocks/flushcommon/mock_util/mock_binlogio.go @@ -1,6 +1,6 @@ // Code generated by mockery v2.46.0. DO NOT EDIT. -package io +package mock_util import ( context "context" diff --git a/internal/storage/rw.go b/internal/storage/rw.go index 642abe0baf..82af293378 100644 --- a/internal/storage/rw.go +++ b/internal/storage/rw.go @@ -25,6 +25,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/allocator" + "github.com/milvus-io/milvus/internal/storagecommon" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/util/merr" ) @@ -35,17 +36,20 @@ const ( ) type rwOptions struct { - version int64 - bufferSize uint64 - downloader func(ctx context.Context, paths []string) ([][]byte, error) - uploader func(ctx context.Context, kvs map[string][]byte) error + version int64 + bufferSize int64 + downloader func(ctx context.Context, paths []string) ([][]byte, error) + uploader func(ctx context.Context, kvs map[string][]byte) error + multiPartUploadSize int64 + columnGroups []storagecommon.ColumnGroup } type RwOption func(*rwOptions) func defaultRwOptions() *rwOptions { return &rwOptions{ - bufferSize: 32 * 1024 * 1024, + bufferSize: 32 * 1024 * 1024, + multiPartUploadSize: 10 * 1024 * 1024, } } @@ -55,12 +59,18 @@ func WithVersion(version int64) RwOption { } } -func WithBufferSize(bufferSize uint64) RwOption { +func WithBufferSize(bufferSize int64) RwOption { return func(options *rwOptions) { options.bufferSize = bufferSize } } +func WithMultiPartUploadSize(multiPartUploadSize int64) RwOption { + return func(options *rwOptions) { + options.multiPartUploadSize = multiPartUploadSize + } +} + func WithDownloader(downloader func(ctx context.Context, paths []string) ([][]byte, error)) RwOption { return func(options *rwOptions) { options.downloader = downloader @@ -73,6 +83,12 @@ func WithUploader(uploader func(ctx context.Context, kvs map[string][]byte) erro } } +func WithColumnGroups(columnGroups []storagecommon.ColumnGroup) RwOption { + return func(options *rwOptions) { + options.columnGroups = columnGroups + } +} + func NewBinlogRecordReader(ctx context.Context, binlogs []*datapb.FieldBinlog, schema *schemapb.CollectionSchema, option ...RwOption) (RecordReader, error) { rwOptions := defaultRwOptions() for _, opt := range option { @@ -103,7 +119,19 @@ func NewBinlogRecordReader(ctx context.Context, binlogs []*datapb.FieldBinlog, s return blobs, nil }) case StorageV2: - // TODO: integrate v2 + if len(binlogs) <= 0 { + return nil, sio.EOF + } + binlogLists := lo.Map(binlogs, func(fieldBinlog *datapb.FieldBinlog, _ int) []*datapb.Binlog { + return fieldBinlog.GetBinlogs() + }) + paths := make([][]string, len(binlogLists[0])) + for _, binlogs := range binlogLists { + for j, binlog := range binlogs { + paths[j] = append(paths[j], binlog.GetLogPath()) + } + } + return newPackedRecordReader(paths, schema, rwOptions.bufferSize) } return nil, merr.WrapErrServiceInternal(fmt.Sprintf("unsupported storage version %d", rwOptions.version)) } @@ -116,20 +144,23 @@ func NewBinlogRecordWriter(ctx context.Context, collectionID, partitionID, segme for _, opt := range option { opt(rwOptions) } + blobsWriter := func(blobs []*Blob) error { + kvs := make(map[string][]byte, len(blobs)) + for _, blob := range blobs { + kvs[blob.Key] = blob.Value + } + return rwOptions.uploader(ctx, kvs) + } switch rwOptions.version { case StorageV1: - blobsWriter := func(blobs []*Blob) error { - kvs := make(map[string][]byte, len(blobs)) - for _, blob := range blobs { - kvs[blob.Key] = blob.Value - } - return rwOptions.uploader(ctx, kvs) - } return newCompositeBinlogRecordWriter(collectionID, partitionID, segmentID, schema, blobsWriter, allocator, chunkSize, rootPath, maxRowNum, ) case StorageV2: - // TODO: integrate v2 + return newPackedBinlogRecordWriter(collectionID, partitionID, segmentID, schema, + blobsWriter, allocator, chunkSize, rootPath, maxRowNum, + rwOptions.bufferSize, rwOptions.multiPartUploadSize, rwOptions.columnGroups, + ) } return nil, merr.WrapErrServiceInternal(fmt.Sprintf("unsupported storage version %d", rwOptions.version)) } diff --git a/internal/storage/rw_test.go b/internal/storage/rw_test.go new file mode 100644 index 0000000000..585a6e73b0 --- /dev/null +++ b/internal/storage/rw_test.go @@ -0,0 +1,381 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "io" + "math" + "strconv" + "sync/atomic" + "testing" + "time" + + "github.com/samber/lo" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + "google.golang.org/grpc" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/allocator" + "github.com/milvus-io/milvus/internal/mocks/flushcommon/mock_util" + "github.com/milvus-io/milvus/internal/storagecommon" + "github.com/milvus-io/milvus/internal/util/initcore" + "github.com/milvus-io/milvus/pkg/v2/common" + "github.com/milvus-io/milvus/pkg/v2/proto/datapb" + "github.com/milvus-io/milvus/pkg/v2/proto/rootcoordpb" + "github.com/milvus-io/milvus/pkg/v2/util/merr" + "github.com/milvus-io/milvus/pkg/v2/util/tsoutil" + "github.com/milvus-io/milvus/pkg/v2/util/typeutil" +) + +type mockIDAllocator struct{} + +func (tso *mockIDAllocator) AllocID(ctx context.Context, req *rootcoordpb.AllocIDRequest, opts ...grpc.CallOption) (*rootcoordpb.AllocIDResponse, error) { + return &rootcoordpb.AllocIDResponse{ + Status: merr.Success(), + ID: int64(1), + Count: req.Count, + }, nil +} + +func newMockIDAllocator() *mockIDAllocator { + return &mockIDAllocator{} +} + +func TestPackedBinlogRecordSuite(t *testing.T) { + suite.Run(t, new(PackedBinlogRecordSuite)) +} + +type PackedBinlogRecordSuite struct { + suite.Suite + + ctx context.Context + mockID atomic.Int64 + logIDAlloc allocator.Interface + mockBinlogIO *mock_util.MockBinlogIO + + collectionID UniqueID + partitionID UniqueID + segmentID UniqueID + schema *schemapb.CollectionSchema + rootPath string + maxRowNum int64 + chunkSize uint64 +} + +func (s *PackedBinlogRecordSuite) SetupTest() { + ctx := context.Background() + s.ctx = ctx + logIDAlloc := allocator.NewLocalAllocator(1, math.MaxInt64) + s.logIDAlloc = logIDAlloc + initcore.InitLocalArrowFileSystem("/tmp") + s.mockID.Store(time.Now().UnixMilli()) + s.mockBinlogIO = mock_util.NewMockBinlogIO(s.T()) + s.collectionID = UniqueID(0) + s.partitionID = UniqueID(0) + s.segmentID = UniqueID(0) + s.schema = generateTestSchema() + s.rootPath = "/tmp" + s.maxRowNum = int64(1000) + s.chunkSize = uint64(1024) +} + +func genTestColumnGroups(schema *schemapb.CollectionSchema) []storagecommon.ColumnGroup { + fieldBinlogs := make([]*datapb.FieldBinlog, 0) + for i, field := range schema.Fields { + fieldBinlogs = append(fieldBinlogs, &datapb.FieldBinlog{ + FieldID: field.FieldID, + Binlogs: []*datapb.Binlog{ + { + EntriesNum: int64(10 * (i + 1)), + LogSize: int64(1000 / (i + 1)), + }, + }, + }) + } + return storagecommon.SplitByFieldSize(fieldBinlogs, 10) +} + +func (s *PackedBinlogRecordSuite) TestPackedBinlogRecordIntegration() { + s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil) + rows := 10000 + readBatchSize := 1024 + columnGroups := genTestColumnGroups(s.schema) + wOption := []RwOption{ + WithUploader(func(ctx context.Context, kvs map[string][]byte) error { + return s.mockBinlogIO.Upload(ctx, kvs) + }), + WithVersion(StorageV2), + WithMultiPartUploadSize(0), + WithBufferSize(1 * 1024 * 1024), // 1MB + WithColumnGroups(columnGroups), + } + + w, err := NewBinlogRecordWriter(s.ctx, s.collectionID, s.partitionID, s.segmentID, s.schema, s.logIDAlloc, s.chunkSize, s.rootPath, s.maxRowNum, wOption...) + s.NoError(err) + + blobs, err := generateTestData(rows) + s.NoError(err) + + reader, err := NewBinlogDeserializeReader(generateTestSchema(), MakeBlobsReader(blobs)) + s.NoError(err) + defer reader.Close() + + for i := 1; i <= rows; i++ { + err = reader.Next() + s.NoError(err) + + value := reader.Value() + rec, err := ValueSerializer([]*Value{value}, s.schema.Fields) + s.NoError(err) + err = w.Write(rec) + s.NoError(err) + } + err = w.Close() + s.NoError(err) + writtenUncompressed := w.GetWrittenUncompressed() + s.Positive(writtenUncompressed) + + rowNum := w.GetRowNum() + s.Equal(rowNum, int64(rows)) + + fieldBinlogs, statsLog, bm25StatsLog := w.GetLogs() + s.Equal(len(fieldBinlogs), len(columnGroups)) + for _, columnGroup := range fieldBinlogs { + s.Equal(len(columnGroup.Binlogs), 1) + s.Equal(columnGroup.Binlogs[0].EntriesNum, int64(rows)) + s.Positive(columnGroup.Binlogs[0].MemorySize) + } + + s.Equal(len(statsLog.Binlogs), 1) + s.Equal(statsLog.Binlogs[0].EntriesNum, int64(rows)) + + s.Equal(len(bm25StatsLog), 0) + + binlogs := lo.Values(fieldBinlogs) + rOption := []RwOption{ + WithVersion(StorageV2), + } + r, err := NewBinlogRecordReader(s.ctx, binlogs, s.schema, rOption...) + s.NoError(err) + for i := 0; i < rows/readBatchSize+1; i++ { + rec, err := r.Next() + s.NoError(err) + if i < rows/readBatchSize { + s.Equal(rec.Len(), readBatchSize) + } else { + s.Equal(rec.Len(), rows%readBatchSize) + } + } + + _, err = r.Next() + s.Equal(err, io.EOF) + err = r.Close() + s.NoError(err) +} + +func (s *PackedBinlogRecordSuite) TestGenerateBM25Stats() { + s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil) + s.schema = genCollectionSchemaWithBM25() + columnGroups := genTestColumnGroups(s.schema) + wOption := []RwOption{ + WithUploader(func(ctx context.Context, kvs map[string][]byte) error { + return s.mockBinlogIO.Upload(ctx, kvs) + }), + WithVersion(StorageV2), + WithMultiPartUploadSize(0), + WithBufferSize(10 * 1024 * 1024), // 10MB + WithColumnGroups(columnGroups), + } + + v := &Value{ + PK: NewVarCharPrimaryKey("0"), + Timestamp: int64(tsoutil.ComposeTSByTime(getMilvusBirthday(), 0)), + Value: genRowWithBM25(0), + } + rec, err := ValueSerializer([]*Value{v}, s.schema.Fields) + s.NoError(err) + + w, err := NewBinlogRecordWriter(s.ctx, s.collectionID, s.partitionID, s.segmentID, s.schema, s.logIDAlloc, s.chunkSize, s.rootPath, s.maxRowNum, wOption...) + s.NoError(err) + err = w.Write(rec) + s.NoError(err) + err = w.Close() + s.NoError(err) + fieldBinlogs, statsLog, bm25StatsLog := w.GetLogs() + s.Equal(len(fieldBinlogs), len(columnGroups)) + + s.Equal(statsLog.Binlogs[0].EntriesNum, int64(1)) + s.Positive(statsLog.Binlogs[0].MemorySize) + + s.Equal(len(bm25StatsLog), 1) + s.Equal(bm25StatsLog[102].Binlogs[0].EntriesNum, int64(1)) + s.Positive(bm25StatsLog[102].Binlogs[0].MemorySize) +} + +func (s *PackedBinlogRecordSuite) TestUnsuportedStorageVersion() { + wOption := []RwOption{ + WithVersion(-1), + } + _, err := NewBinlogRecordWriter(s.ctx, s.collectionID, s.partitionID, s.segmentID, s.schema, s.logIDAlloc, s.chunkSize, s.rootPath, s.maxRowNum, wOption...) + s.Error(err) + + rOption := []RwOption{ + WithVersion(-1), + } + _, err = NewBinlogRecordReader(s.ctx, []*datapb.FieldBinlog{{}}, s.schema, rOption...) + s.Error(err) +} + +func (s *PackedBinlogRecordSuite) TestNoPrimaryKeyError() { + s.schema = &schemapb.CollectionSchema{Fields: []*schemapb.FieldSchema{ + {FieldID: 13, Name: "field12", DataType: schemapb.DataType_JSON}, + }} + columnGroups := genTestColumnGroups(s.schema) + wOption := []RwOption{ + WithVersion(StorageV2), + WithColumnGroups(columnGroups), + } + _, err := NewBinlogRecordWriter(s.ctx, s.collectionID, s.partitionID, s.segmentID, s.schema, s.logIDAlloc, s.chunkSize, s.rootPath, s.maxRowNum, wOption...) + s.Error(err) +} + +func (s *PackedBinlogRecordSuite) TestConvertArrowSchemaError() { + s.schema = &schemapb.CollectionSchema{Fields: []*schemapb.FieldSchema{ + {FieldID: 14, Name: "field13", DataType: schemapb.DataType_Float16Vector, TypeParams: []*commonpb.KeyValuePair{}}, + }} + columnGroups := genTestColumnGroups(s.schema) + wOption := []RwOption{ + WithVersion(StorageV2), + WithColumnGroups(columnGroups), + } + _, err := NewBinlogRecordWriter(s.ctx, s.collectionID, s.partitionID, s.segmentID, s.schema, s.logIDAlloc, s.chunkSize, s.rootPath, s.maxRowNum, wOption...) + s.Error(err) +} + +func (s *PackedBinlogRecordSuite) TestEmptyColumnGroup() { + wOption := []RwOption{ + WithVersion(StorageV2), + } + _, err := NewBinlogRecordWriter(s.ctx, s.collectionID, s.partitionID, s.segmentID, s.schema, s.logIDAlloc, s.chunkSize, s.rootPath, s.maxRowNum, wOption...) + s.Error(err) +} + +func (s *PackedBinlogRecordSuite) TestEmptyBinlog() { + rOption := []RwOption{ + WithVersion(StorageV2), + } + _, err := NewBinlogRecordReader(s.ctx, []*datapb.FieldBinlog{}, s.schema, rOption...) + s.Error(err) +} + +func (s *PackedBinlogRecordSuite) TestAllocIDExhausedError() { + columnGroups := genTestColumnGroups(s.schema) + wOption := []RwOption{ + WithVersion(StorageV2), + WithColumnGroups(columnGroups), + } + logIDAlloc := allocator.NewLocalAllocator(1, 1) + w, err := NewBinlogRecordWriter(s.ctx, s.collectionID, s.partitionID, s.segmentID, s.schema, logIDAlloc, s.chunkSize, s.rootPath, s.maxRowNum, wOption...) + s.NoError(err) + + size := 10 + blobs, err := generateTestData(size) + s.NoError(err) + + reader, err := NewBinlogDeserializeReader(generateTestSchema(), MakeBlobsReader(blobs)) + s.NoError(err) + defer reader.Close() + + for i := 0; i < size; i++ { + err = reader.Next() + s.NoError(err) + + value := reader.Value() + rec, err := ValueSerializer([]*Value{value}, s.schema.Fields) + s.NoError(err) + err = w.Write(rec) + s.Error(err) + } +} + +func genRowWithBM25(magic int64) map[int64]interface{} { + ts := tsoutil.ComposeTSByTime(getMilvusBirthday(), 0) + return map[int64]interface{}{ + common.RowIDField: magic, + common.TimeStampField: int64(ts), + 100: strconv.FormatInt(magic, 10), + 101: "varchar", + 102: typeutil.CreateAndSortSparseFloatRow(map[uint32]float32{1: 1}), + } +} + +func genCollectionSchemaWithBM25() *schemapb.CollectionSchema { + return &schemapb.CollectionSchema{ + Name: "schema", + Description: "schema", + Fields: []*schemapb.FieldSchema{ + { + FieldID: common.RowIDField, + Name: "row_id", + DataType: schemapb.DataType_Int64, + }, + { + FieldID: common.TimeStampField, + Name: "Timestamp", + DataType: schemapb.DataType_Int64, + }, + { + FieldID: 100, + Name: "pk", + DataType: schemapb.DataType_VarChar, + IsPrimaryKey: true, + }, + { + FieldID: 101, + Name: "text", + DataType: schemapb.DataType_VarChar, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.MaxLengthKey, + Value: "8", + }, + }, + }, + { + FieldID: 102, + Name: "sparse", + DataType: schemapb.DataType_SparseFloatVector, + }, + }, + Functions: []*schemapb.FunctionSchema{{ + Name: "BM25", + Id: 100, + Type: schemapb.FunctionType_BM25, + InputFieldNames: []string{"text"}, + InputFieldIds: []int64{101}, + OutputFieldNames: []string{"sparse"}, + OutputFieldIds: []int64{102}, + }}, + } +} + +func getMilvusBirthday() time.Time { + return time.Date(2019, time.Month(5), 30, 0, 0, 0, 0, time.UTC) +} diff --git a/internal/storage/serde_events_v2.go b/internal/storage/serde_events_v2.go index da411123fa..0650f6f1cf 100644 --- a/internal/storage/serde_events_v2.go +++ b/internal/storage/serde_events_v2.go @@ -21,32 +21,73 @@ import ( "io" "github.com/apache/arrow/go/v17/arrow" + "github.com/apache/arrow/go/v17/arrow/array" + "github.com/samber/lo" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/allocator" + "github.com/milvus-io/milvus/internal/storagecommon" "github.com/milvus-io/milvus/internal/storagev2/packed" "github.com/milvus-io/milvus/pkg/v2/common" + "github.com/milvus-io/milvus/pkg/v2/log" + "github.com/milvus-io/milvus/pkg/v2/proto/datapb" + "github.com/milvus-io/milvus/pkg/v2/proto/etcdpb" "github.com/milvus-io/milvus/pkg/v2/util/merr" + "github.com/milvus-io/milvus/pkg/v2/util/metautil" + "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) type packedRecordReader struct { + paths [][]string + chunk int reader *packed.PackedReader - bufferSize int64 - schema *schemapb.CollectionSchema - field2Col map[FieldID]int + bufferSize int64 + arrowSchema *arrow.Schema + field2Col map[FieldID]int } var _ RecordReader = (*packedRecordReader)(nil) +func (pr *packedRecordReader) iterateNextBatch() error { + if pr.reader != nil { + if err := pr.reader.Close(); err != nil { + return err + } + } + + if pr.chunk >= len(pr.paths) { + return io.EOF + } + + reader, err := packed.NewPackedReader(pr.paths[pr.chunk], pr.arrowSchema, pr.bufferSize) + pr.chunk++ + if err != nil { + return merr.WrapErrParameterInvalid("New binlog record packed reader error: %s", err.Error()) + } + pr.reader = reader + return nil +} + func (pr *packedRecordReader) Next() (Record, error) { if pr.reader == nil { - return nil, io.EOF + if err := pr.iterateNextBatch(); err != nil { + return nil, err + } } - rec, err := pr.reader.ReadNext() - if err != nil || rec == nil { - return nil, io.EOF + + for { + rec, err := pr.reader.ReadNext() + if err == io.EOF { + if err := pr.iterateNextBatch(); err != nil { + return nil, err + } + continue + } else if err != nil { + return nil, err + } + return NewSimpleArrowRecord(rec, pr.field2Col), nil } - return NewSimpleArrowRecord(rec, pr.field2Col), nil } func (pr *packedRecordReader) Close() error { @@ -56,30 +97,26 @@ func (pr *packedRecordReader) Close() error { return nil } -func newPackedRecordReader(paths []string, schema *schemapb.CollectionSchema, bufferSize int64, +func newPackedRecordReader(paths [][]string, schema *schemapb.CollectionSchema, bufferSize int64, ) (*packedRecordReader, error) { arrowSchema, err := ConvertToArrowSchema(schema.Fields) if err != nil { return nil, merr.WrapErrParameterInvalid("convert collection schema [%s] to arrow schema error: %s", schema.Name, err.Error()) } - reader, err := packed.NewPackedReader(paths, arrowSchema, bufferSize) - if err != nil { - return nil, merr.WrapErrParameterInvalid("New binlog record packed reader error: %s", err.Error()) - } field2Col := make(map[FieldID]int) for i, field := range schema.Fields { field2Col[field.FieldID] = i } return &packedRecordReader{ - reader: reader, - schema: schema, - bufferSize: bufferSize, - field2Col: field2Col, + paths: paths, + bufferSize: bufferSize, + arrowSchema: arrowSchema, + field2Col: field2Col, }, nil } -func NewPackedDeserializeReader(paths []string, schema *schemapb.CollectionSchema, - bufferSize int64, pkFieldID FieldID, +func NewPackedDeserializeReader(paths [][]string, schema *schemapb.CollectionSchema, + bufferSize int64, ) (*DeserializeReader[*Value], error) { reader, err := newPackedRecordReader(paths, schema, bufferSize) if err != nil { @@ -87,12 +124,23 @@ func NewPackedDeserializeReader(paths []string, schema *schemapb.CollectionSchem } return NewDeserializeReader(reader, func(r Record, v []*Value) error { + pkField := func() *schemapb.FieldSchema { + for _, field := range schema.Fields { + if field.GetIsPrimaryKey() { + return field + } + } + return nil + }() + if pkField == nil { + return merr.WrapErrServiceInternal("no primary key field found") + } + rec, ok := r.(*simpleArrowRecord) if !ok { return merr.WrapErrServiceInternal("can not cast to simple arrow record") } - schema := reader.schema numFields := len(schema.Fields) for i := 0; i < rec.Len(); i++ { if v[i] == nil { @@ -124,8 +172,8 @@ func NewPackedDeserializeReader(paths []string, schema *schemapb.CollectionSchem value.ID = rowID value.Timestamp = m[common.TimeStampField].(int64) - pkCol := rec.field2Col[pkFieldID] - pk, err := GenPrimaryKeyByRawData(m[pkFieldID], schema.Fields[pkCol].DataType) + pkCol := rec.field2Col[pkField.FieldID] + pk, err := GenPrimaryKeyByRawData(m[pkField.FieldID], schema.Fields[pkCol].DataType) if err != nil { return err } @@ -141,16 +189,15 @@ func NewPackedDeserializeReader(paths []string, schema *schemapb.CollectionSchem var _ RecordWriter = (*packedRecordWriter)(nil) type packedRecordWriter struct { - writer *packed.PackedWriter - - bufferSize int64 - multiPartUploadSize int64 - columnGroups [][]int - paths []string - schema *arrow.Schema - - numRows int - writtenUncompressed uint64 + writer *packed.PackedWriter + bufferSize int64 + multiPartUploadSize int64 + columnGroups []storagecommon.ColumnGroup + paths []string + schema *arrow.Schema + rowNum int64 + writtenUncompressed uint64 + columnGroupUncompressed []uint64 } func (pw *packedRecordWriter) Write(r Record) error { @@ -158,9 +205,16 @@ func (pw *packedRecordWriter) Write(r Record) error { if !ok { return merr.WrapErrServiceInternal("can not cast to simple arrow record") } - pw.numRows += r.Len() - for _, arr := range rec.r.Columns() { - pw.writtenUncompressed += uint64(calculateArraySize(arr)) + pw.rowNum += int64(r.Len()) + for col, arr := range rec.r.Columns() { + size := uint64(calculateArraySize(arr)) + pw.writtenUncompressed += size + for columnGroup, group := range pw.columnGroups { + if lo.Contains(group.Columns, col) { + pw.columnGroupUncompressed[columnGroup] += size + break + } + } } defer rec.Release() return pw.writer.WriteRecordBatch(rec.r) @@ -170,6 +224,14 @@ func (pw *packedRecordWriter) GetWrittenUncompressed() uint64 { return pw.writtenUncompressed } +func (pw *packedRecordWriter) GetWrittenPaths() []string { + return pw.paths +} + +func (pw *packedRecordWriter) GetWrittenRowNum() int64 { + return pw.rowNum +} + func (pw *packedRecordWriter) Close() error { if pw.writer != nil { return pw.writer.Close() @@ -177,32 +239,331 @@ func (pw *packedRecordWriter) Close() error { return nil } -func NewPackedRecordWriter(paths []string, schema *arrow.Schema, bufferSize int64, multiPartUploadSize int64, columnGroups [][]int) (*packedRecordWriter, error) { +func NewPackedRecordWriter(paths []string, schema *arrow.Schema, bufferSize int64, multiPartUploadSize int64, columnGroups []storagecommon.ColumnGroup) (*packedRecordWriter, error) { writer, err := packed.NewPackedWriter(paths, schema, bufferSize, multiPartUploadSize, columnGroups) if err != nil { return nil, merr.WrapErrServiceInternal( fmt.Sprintf("can not new packed record writer %s", err.Error())) } + columnGroupUncompressed := make([]uint64, len(columnGroups)) return &packedRecordWriter{ - writer: writer, - schema: schema, - bufferSize: bufferSize, - paths: paths, + writer: writer, + schema: schema, + bufferSize: bufferSize, + paths: paths, + columnGroups: columnGroups, + columnGroupUncompressed: columnGroupUncompressed, }, nil } -func NewPackedSerializeWriter(paths []string, schema *schemapb.CollectionSchema, bufferSize int64, multiPartUploadSize int64, columnGroups [][]int, batchSize int) (*SerializeWriter[*Value], error) { +func NewPackedSerializeWriter(paths []string, schema *schemapb.CollectionSchema, bufferSize int64, multiPartUploadSize int64, columnGroups []storagecommon.ColumnGroup, batchSize int) (*SerializeWriter[*Value], error) { arrowSchema, err := ConvertToArrowSchema(schema.Fields) if err != nil { return nil, merr.WrapErrServiceInternal( fmt.Sprintf("can not convert collection schema %s to arrow schema: %s", schema.Name, err.Error())) } - packedRecordWriter, err := NewPackedRecordWriter(paths, arrowSchema, bufferSize, multiPartUploadSize, columnGroups) + PackedBinlogRecordWriter, err := NewPackedRecordWriter(paths, arrowSchema, bufferSize, multiPartUploadSize, columnGroups) if err != nil { return nil, merr.WrapErrServiceInternal( fmt.Sprintf("can not new packed record writer %s", err.Error())) } - return NewSerializeRecordWriter[*Value](packedRecordWriter, func(v []*Value) (Record, error) { + return NewSerializeRecordWriter[*Value](PackedBinlogRecordWriter, func(v []*Value) (Record, error) { return ValueSerializer(v, schema.Fields) }, batchSize), nil } + +var _ BinlogRecordWriter = (*PackedBinlogRecordWriter)(nil) + +type PackedBinlogRecordWriter struct { + // attributes + collectionID UniqueID + partitionID UniqueID + segmentID UniqueID + schema *schemapb.CollectionSchema + BlobsWriter ChunkedBlobsWriter + allocator allocator.Interface + chunkSize uint64 + rootPath string + maxRowNum int64 + arrowSchema *arrow.Schema + bufferSize int64 + multiPartUploadSize int64 + columnGroups []storagecommon.ColumnGroup + + // writer and stats generated at runtime + writer *packedRecordWriter + pkstats *PrimaryKeyStats + bm25Stats map[int64]*BM25Stats + tsFrom typeutil.Timestamp + tsTo typeutil.Timestamp + rowNum int64 + writtenUncompressed uint64 + + // results + fieldBinlogs map[FieldID]*datapb.FieldBinlog + statsLog *datapb.FieldBinlog + bm25StatsLog map[FieldID]*datapb.FieldBinlog +} + +func (pw *PackedBinlogRecordWriter) Write(r Record) error { + if err := pw.initWriters(); err != nil { + return err + } + + tsArray := r.Column(common.TimeStampField).(*array.Int64) + rows := r.Len() + for i := 0; i < rows; i++ { + ts := typeutil.Timestamp(tsArray.Value(i)) + if ts < pw.tsFrom { + pw.tsFrom = ts + } + if ts > pw.tsTo { + pw.tsTo = ts + } + + switch schemapb.DataType(pw.pkstats.PkType) { + case schemapb.DataType_Int64: + pkArray := r.Column(pw.pkstats.FieldID).(*array.Int64) + pk := &Int64PrimaryKey{ + Value: pkArray.Value(i), + } + pw.pkstats.Update(pk) + case schemapb.DataType_VarChar: + pkArray := r.Column(pw.pkstats.FieldID).(*array.String) + pk := &VarCharPrimaryKey{ + Value: pkArray.Value(i), + } + pw.pkstats.Update(pk) + default: + panic("invalid data type") + } + + for fieldID, stats := range pw.bm25Stats { + field, ok := r.Column(fieldID).(*array.Binary) + if !ok { + return fmt.Errorf("bm25 field value not found") + } + stats.AppendBytes(field.Value(i)) + } + } + + err := pw.writer.Write(r) + if err != nil { + return merr.WrapErrServiceInternal(fmt.Sprintf("write record batch error: %s", err.Error())) + } + return nil +} + +func (pw *PackedBinlogRecordWriter) initWriters() error { + if pw.writer == nil { + logIdStart, _, err := pw.allocator.Alloc(uint32(len(pw.columnGroups))) + if err != nil { + return err + } + paths := []string{} + for columnGroup := range pw.columnGroups { + path := metautil.BuildInsertLogPath(pw.rootPath, pw.collectionID, pw.partitionID, pw.segmentID, typeutil.UniqueID(columnGroup), logIdStart) + paths = append(paths, path) + logIdStart++ + } + pw.writer, err = NewPackedRecordWriter(paths, pw.arrowSchema, pw.bufferSize, pw.multiPartUploadSize, pw.columnGroups) + if err != nil { + return merr.WrapErrServiceInternal(fmt.Sprintf("can not new packed record writer %s", err.Error())) + } + } + return nil +} + +func (pw *PackedBinlogRecordWriter) GetWrittenUncompressed() uint64 { + return pw.writtenUncompressed +} + +func (pw *PackedBinlogRecordWriter) Close() error { + pw.finalizeBinlogs() + if err := pw.writeStats(); err != nil { + return err + } + if err := pw.writeBm25Stats(); err != nil { + return err + } + if err := pw.writer.Close(); err != nil { + return err + } + return nil +} + +func (pw *PackedBinlogRecordWriter) finalizeBinlogs() { + if pw.writer == nil { + return + } + pw.rowNum = pw.writer.GetWrittenRowNum() + pw.writtenUncompressed = pw.writer.GetWrittenUncompressed() + if pw.fieldBinlogs == nil { + pw.fieldBinlogs = make(map[FieldID]*datapb.FieldBinlog, len(pw.columnGroups)) + } + for columnGroup := range pw.columnGroups { + columnGroupID := typeutil.UniqueID(columnGroup) + if _, exists := pw.fieldBinlogs[columnGroupID]; !exists { + pw.fieldBinlogs[columnGroupID] = &datapb.FieldBinlog{ + FieldID: columnGroupID, + } + } + pw.fieldBinlogs[columnGroupID].Binlogs = append(pw.fieldBinlogs[columnGroupID].Binlogs, &datapb.Binlog{ + LogSize: int64(pw.writer.columnGroupUncompressed[columnGroup]), // TODO: should provide the log size of each column group file in storage v2 + MemorySize: int64(pw.writer.columnGroupUncompressed[columnGroup]), + LogPath: pw.writer.GetWrittenPaths()[columnGroupID], + EntriesNum: pw.writer.GetWrittenRowNum(), + TimestampFrom: pw.tsFrom, + TimestampTo: pw.tsTo, + }) + } +} + +func (pw *PackedBinlogRecordWriter) writeStats() error { + if pw.pkstats == nil { + return nil + } + + id, err := pw.allocator.AllocOne() + if err != nil { + return err + } + + codec := NewInsertCodecWithSchema(&etcdpb.CollectionMeta{ + ID: pw.collectionID, + Schema: pw.schema, + }) + sblob, err := codec.SerializePkStats(pw.pkstats, pw.rowNum) + if err != nil { + return err + } + + sblob.Key = metautil.BuildStatsLogPath(pw.rootPath, + pw.collectionID, pw.partitionID, pw.segmentID, pw.pkstats.FieldID, id) + + if err := pw.BlobsWriter([]*Blob{sblob}); err != nil { + return err + } + + pw.statsLog = &datapb.FieldBinlog{ + FieldID: pw.pkstats.FieldID, + Binlogs: []*datapb.Binlog{ + { + LogSize: int64(len(sblob.GetValue())), + MemorySize: int64(len(sblob.GetValue())), + LogPath: sblob.Key, + EntriesNum: pw.rowNum, + }, + }, + } + return nil +} + +func (pw *PackedBinlogRecordWriter) writeBm25Stats() error { + if len(pw.bm25Stats) == 0 { + return nil + } + id, _, err := pw.allocator.Alloc(uint32(len(pw.bm25Stats))) + if err != nil { + return err + } + + if pw.bm25StatsLog == nil { + pw.bm25StatsLog = make(map[FieldID]*datapb.FieldBinlog) + } + for fid, stats := range pw.bm25Stats { + bytes, err := stats.Serialize() + if err != nil { + return err + } + key := metautil.BuildBm25LogPath(pw.rootPath, + pw.collectionID, pw.partitionID, pw.segmentID, fid, id) + blob := &Blob{ + Key: key, + Value: bytes, + RowNum: stats.NumRow(), + MemorySize: int64(len(bytes)), + } + if err := pw.BlobsWriter([]*Blob{blob}); err != nil { + return err + } + + fieldLog := &datapb.FieldBinlog{ + FieldID: fid, + Binlogs: []*datapb.Binlog{ + { + LogSize: int64(len(blob.GetValue())), + MemorySize: int64(len(blob.GetValue())), + LogPath: key, + EntriesNum: pw.rowNum, + }, + }, + } + + pw.bm25StatsLog[fid] = fieldLog + id++ + } + + return nil +} + +func (pw *PackedBinlogRecordWriter) GetLogs() ( + fieldBinlogs map[FieldID]*datapb.FieldBinlog, + statsLog *datapb.FieldBinlog, + bm25StatsLog map[FieldID]*datapb.FieldBinlog, +) { + return pw.fieldBinlogs, pw.statsLog, pw.bm25StatsLog +} + +func (pw *PackedBinlogRecordWriter) GetRowNum() int64 { + return pw.rowNum +} + +func newPackedBinlogRecordWriter(collectionID, partitionID, segmentID UniqueID, schema *schemapb.CollectionSchema, + blobsWriter ChunkedBlobsWriter, allocator allocator.Interface, chunkSize uint64, rootPath string, maxRowNum int64, bufferSize, multiPartUploadSize int64, columnGroups []storagecommon.ColumnGroup, +) (*PackedBinlogRecordWriter, error) { + if len(columnGroups) == 0 { + return nil, merr.WrapErrParameterInvalidMsg("please specify column group for packed binlog record writer") + } + arrowSchema, err := ConvertToArrowSchema(schema.Fields) + if err != nil { + return nil, merr.WrapErrParameterInvalid("convert collection schema [%s] to arrow schema error: %s", schema.Name, err.Error()) + } + pkField, err := typeutil.GetPrimaryFieldSchema(schema) + if err != nil { + log.Warn("failed to get pk field from schema") + return nil, err + } + stats, err := NewPrimaryKeyStats(pkField.GetFieldID(), int64(pkField.GetDataType()), maxRowNum) + if err != nil { + return nil, err + } + bm25FieldIDs := lo.FilterMap(schema.GetFunctions(), func(function *schemapb.FunctionSchema, _ int) (int64, bool) { + if function.GetType() == schemapb.FunctionType_BM25 { + return function.GetOutputFieldIds()[0], true + } + return 0, false + }) + bm25Stats := make(map[int64]*BM25Stats, len(bm25FieldIDs)) + for _, fid := range bm25FieldIDs { + bm25Stats[fid] = NewBM25Stats() + } + + return &PackedBinlogRecordWriter{ + collectionID: collectionID, + partitionID: partitionID, + segmentID: segmentID, + schema: schema, + arrowSchema: arrowSchema, + BlobsWriter: blobsWriter, + allocator: allocator, + chunkSize: chunkSize, + rootPath: rootPath, + maxRowNum: maxRowNum, + bufferSize: bufferSize, + multiPartUploadSize: multiPartUploadSize, + columnGroups: columnGroups, + pkstats: stats, + bm25Stats: bm25Stats, + }, nil +} diff --git a/internal/storage/serde_events_v2_test.go b/internal/storage/serde_events_v2_test.go index 8c368d3c8d..493f625fdb 100644 --- a/internal/storage/serde_events_v2_test.go +++ b/internal/storage/serde_events_v2_test.go @@ -17,12 +17,13 @@ package storage import ( + "io" "testing" "github.com/stretchr/testify/assert" + "github.com/milvus-io/milvus/internal/storagecommon" "github.com/milvus-io/milvus/internal/util/initcore" - "github.com/milvus-io/milvus/pkg/v2/common" ) func TestPackedSerde(t *testing.T) { @@ -30,47 +31,56 @@ func TestPackedSerde(t *testing.T) { initcore.InitLocalArrowFileSystem("/tmp") size := 10 - blobs, err := generateTestData(size) - assert.NoError(t, err) - - reader, err := NewBinlogDeserializeReader(generateTestSchema(), MakeBlobsReader(blobs)) - assert.NoError(t, err) - defer reader.Close() - - paths := []string{"/tmp/0"} + paths := [][]string{{"/tmp/0"}, {"/tmp/1"}} bufferSize := int64(10 * 1024 * 1024) // 10MB schema := generateTestSchema() - group := []int{} - for i := 0; i < len(schema.Fields); i++ { - group = append(group, i) - } - columnGroups := [][]int{group} - multiPartUploadSize := int64(0) - batchSize := 7 - writer, err := NewPackedSerializeWriter(paths, schema, bufferSize, multiPartUploadSize, columnGroups, batchSize) - assert.NoError(t, err) - for i := 1; i <= size; i++ { - err = reader.Next() + prepareChunkData := func(chunkPaths []string, size int) { + blobs, err := generateTestData(size) assert.NoError(t, err) - value := reader.Value() - assertTestData(t, i, value) - err := writer.Write(value) + reader, err := NewBinlogDeserializeReader(generateTestSchema(), MakeBlobsReader(blobs)) + assert.NoError(t, err) + + group := storagecommon.ColumnGroup{} + for i := 0; i < len(schema.Fields); i++ { + group.Columns = append(group.Columns, i) + } + multiPartUploadSize := int64(0) + batchSize := 7 + writer, err := NewPackedSerializeWriter(chunkPaths, generateTestSchema(), bufferSize, multiPartUploadSize, []storagecommon.ColumnGroup{group}, batchSize) + assert.NoError(t, err) + + for i := 1; i <= size; i++ { + err = reader.Next() + assert.NoError(t, err) + + value := reader.Value() + assertTestData(t, i, value) + err := writer.Write(value) + assert.NoError(t, err) + } + err = writer.Close() + assert.NoError(t, err) + err = reader.Close() assert.NoError(t, err) } - err = writer.Close() - assert.NoError(t, err) - reader, err = NewPackedDeserializeReader(paths, schema, bufferSize, common.RowIDField) + for _, chunkPaths := range paths { + prepareChunkData(chunkPaths, size) + } + + reader, err := NewPackedDeserializeReader(paths, schema, bufferSize) assert.NoError(t, err) defer reader.Close() - for i := 1; i <= size; i++ { + for i := 0; i < size*len(paths); i++ { err = reader.Next() assert.NoError(t, err) value := reader.Value() - assertTestData(t, i, value) + assertTestData(t, i%10+1, value) } + err = reader.Next() + assert.Equal(t, err, io.EOF) }) } diff --git a/internal/storagecommon/column_group_splitter.go b/internal/storagecommon/column_group_splitter.go new file mode 100644 index 0000000000..23f467e876 --- /dev/null +++ b/internal/storagecommon/column_group_splitter.go @@ -0,0 +1,49 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storagecommon + +import ( + "github.com/samber/lo" + + "github.com/milvus-io/milvus/pkg/v2/proto/datapb" +) + +type ColumnGroup struct { + Columns []int // column indices +} + +// split by row average size +func SplitByFieldSize(fieldBinlogs []*datapb.FieldBinlog, splitThresHold int64) []ColumnGroup { + groups := make([]ColumnGroup, 0) + shortColumnGroup := ColumnGroup{Columns: make([]int, 0)} + for i, fieldBinlog := range fieldBinlogs { + if len(fieldBinlog.Binlogs) == 0 { + continue + } + totalSize := lo.SumBy(fieldBinlog.Binlogs, func(b *datapb.Binlog) int64 { return b.LogSize }) + totalNumRows := lo.SumBy(fieldBinlog.Binlogs, func(b *datapb.Binlog) int64 { return b.EntriesNum }) + if totalSize/totalNumRows >= splitThresHold { + groups = append(groups, ColumnGroup{Columns: []int{i}}) + } else { + shortColumnGroup.Columns = append(shortColumnGroup.Columns, i) + } + } + if len(shortColumnGroup.Columns) > 0 { + groups = append(groups, shortColumnGroup) + } + return groups +} diff --git a/internal/storagecommon/column_group_splitter_test.go b/internal/storagecommon/column_group_splitter_test.go new file mode 100644 index 0000000000..de1b3491e0 --- /dev/null +++ b/internal/storagecommon/column_group_splitter_test.go @@ -0,0 +1,127 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storagecommon + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/pkg/v2/proto/datapb" +) + +func TestSplitByFieldSize(t *testing.T) { + tests := []struct { + name string + fieldBinlogs []*datapb.FieldBinlog + splitThresHold int64 + expected []ColumnGroup + }{ + { + name: "Empty input", + fieldBinlogs: []*datapb.FieldBinlog{}, + splitThresHold: 100, + expected: []ColumnGroup{}, + }, + { + name: "Empty binlogs", + fieldBinlogs: []*datapb.FieldBinlog{{FieldID: 0, Binlogs: []*datapb.Binlog{}}}, + splitThresHold: 100, + expected: []ColumnGroup{}, + }, + { + name: "above threshold", + fieldBinlogs: []*datapb.FieldBinlog{ + { + FieldID: 0, + Binlogs: []*datapb.Binlog{ + {LogSize: 1000, EntriesNum: 10}, + }, + }, + { + FieldID: 1, + Binlogs: []*datapb.Binlog{ + {LogSize: 2000, EntriesNum: 10}, + }, + }, + }, + splitThresHold: 50, + expected: []ColumnGroup{ + {Columns: []int{0}}, + {Columns: []int{1}}, + }, + }, + { + name: "one field", + fieldBinlogs: []*datapb.FieldBinlog{ + { + FieldID: 0, + Binlogs: []*datapb.Binlog{ + {LogSize: 100, EntriesNum: 10}, + }, + }, + }, + splitThresHold: 50, + expected: []ColumnGroup{ + {Columns: []int{0}}, + }, + }, + { + name: "Multiple fields, mixed sizes", + fieldBinlogs: []*datapb.FieldBinlog{ + { + FieldID: 0, + Binlogs: []*datapb.Binlog{ // (above) + {LogSize: 500, EntriesNum: 5}, + {LogSize: 500, EntriesNum: 5}, + }, + }, + { + FieldID: 1, + Binlogs: []*datapb.Binlog{ + {LogSize: 200, EntriesNum: 20}, // (below) + }, + }, + { + FieldID: 2, + Binlogs: []*datapb.Binlog{ + {LogSize: 500, EntriesNum: 10}, // (threshold) + }, + }, + { + FieldID: 3, + Binlogs: []*datapb.Binlog{ + {LogSize: 400, EntriesNum: 10}, // (below) + }, + }, + }, + splitThresHold: 50, + expected: []ColumnGroup{ + {Columns: []int{0}}, + {Columns: []int{2}}, + {Columns: []int{1, 3}}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := SplitByFieldSize(tt.fieldBinlogs, tt.splitThresHold) + assert.Equal(t, tt.expected, result) + }) + } +} diff --git a/internal/storagev2/packed/packed_reader.go b/internal/storagev2/packed/packed_reader.go index 0c091e45ab..15402bc9c7 100644 --- a/internal/storagev2/packed/packed_reader.go +++ b/internal/storagev2/packed/packed_reader.go @@ -26,6 +26,7 @@ import "C" import ( "fmt" + "io" "unsafe" "github.com/apache/arrow/go/v17/arrow" @@ -64,7 +65,7 @@ func (pr *PackedReader) ReadNext() (arrow.Record, error) { } if cArr == nil { - return nil, nil // end of stream, no more records to read + return nil, io.EOF // end of stream, no more records to read } // Convert ArrowArray to Go RecordBatch using cdata diff --git a/internal/storagev2/packed/packed_test.go b/internal/storagev2/packed/packed_test.go index c975cc2cb0..f713e471c0 100644 --- a/internal/storagev2/packed/packed_test.go +++ b/internal/storagev2/packed/packed_test.go @@ -15,6 +15,7 @@ package packed import ( + "io" "testing" "github.com/apache/arrow/go/v17/arrow" @@ -23,6 +24,7 @@ import ( "github.com/stretchr/testify/suite" "golang.org/x/exp/rand" + "github.com/milvus-io/milvus/internal/storagecommon" "github.com/milvus-io/milvus/internal/util/initcore" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" ) @@ -77,7 +79,7 @@ func (suite *PackedTestSuite) TestPackedOneFile() { batches := 100 paths := []string{"/tmp/100"} - columnGroups := [][]int{{0, 1, 2}} + columnGroups := []storagecommon.ColumnGroup{{Columns: []int{0, 1, 2}}} bufferSize := int64(10 * 1024 * 1024) // 10MB multiPartUploadSize := int64(0) pw, err := NewPackedWriter(paths, suite.schema, bufferSize, multiPartUploadSize, columnGroups) @@ -129,7 +131,7 @@ func (suite *PackedTestSuite) TestPackedMultiFiles() { rec := b.NewRecord() defer rec.Release() paths := []string{"/tmp/100", "/tmp/101"} - columnGroups := [][]int{{2}, {0, 1}} + columnGroups := []storagecommon.ColumnGroup{{Columns: []int{2}}, {Columns: []int{0, 1}}} bufferSize := int64(10 * 1024 * 1024) // 10MB multiPartUploadSize := int64(0) pw, err := NewPackedWriter(paths, suite.schema, bufferSize, multiPartUploadSize, columnGroups) @@ -147,8 +149,10 @@ func (suite *PackedTestSuite) TestPackedMultiFiles() { var rr arrow.Record for { rr, err = reader.ReadNext() - suite.NoError(err) - if rr == nil { + if err == nil { + suite.NotNil(rr) + } + if err == io.EOF { // end of file break } diff --git a/internal/storagev2/packed/packed_writer.go b/internal/storagev2/packed/packed_writer.go index a5cdc9c7c3..5c666b21ef 100644 --- a/internal/storagev2/packed/packed_writer.go +++ b/internal/storagev2/packed/packed_writer.go @@ -31,9 +31,11 @@ import ( "github.com/apache/arrow/go/v17/arrow" "github.com/apache/arrow/go/v17/arrow/cdata" + + "github.com/milvus-io/milvus/internal/storagecommon" ) -func NewPackedWriter(filePaths []string, schema *arrow.Schema, bufferSize int64, multiPartUploadSize int64, columnGroups [][]int) (*PackedWriter, error) { +func NewPackedWriter(filePaths []string, schema *arrow.Schema, bufferSize int64, multiPartUploadSize int64, columnGroups []storagecommon.ColumnGroup) (*PackedWriter, error) { cFilePaths := make([]*C.char, len(filePaths)) for i, path := range filePaths { cFilePaths[i] = C.CString(path) @@ -52,15 +54,15 @@ func NewPackedWriter(filePaths []string, schema *arrow.Schema, bufferSize int64, cColumnGroups := C.NewCColumnGroups() for _, group := range columnGroups { - cGroup := C.malloc(C.size_t(len(group)) * C.size_t(unsafe.Sizeof(C.int(0)))) + cGroup := C.malloc(C.size_t(len(group.Columns)) * C.size_t(unsafe.Sizeof(C.int(0)))) if cGroup == nil { return nil, fmt.Errorf("failed to allocate memory for column groups") } - cGroupSlice := (*[1 << 30]C.int)(cGroup)[:len(group):len(group)] - for i, val := range group { + cGroupSlice := (*[1 << 30]C.int)(cGroup)[:len(group.Columns):len(group.Columns)] + for i, val := range group.Columns { cGroupSlice[i] = C.int(val) } - C.AddCColumnGroup(cColumnGroups, (*C.int)(cGroup), C.int(len(group))) + C.AddCColumnGroup(cColumnGroups, (*C.int)(cGroup), C.int(len(group.Columns))) C.free(cGroup) }