diff --git a/internal/core/src/common/Consts.h b/internal/core/src/common/Consts.h index a138f85b09..30824cc2ba 100644 --- a/internal/core/src/common/Consts.h +++ b/internal/core/src/common/Consts.h @@ -102,4 +102,6 @@ const std::string INDEX_NUM_ROWS_KEY = "index_num_rows"; const int64_t STORAGE_V1 = 1; const int64_t STORAGE_V2 = 2; -const std::string UNKNOW_CAST_FUNCTION_NAME = "unknown"; \ No newline at end of file +const std::string UNKNOW_CAST_FUNCTION_NAME = "unknown"; + +const int64_t DEFAULT_SHORT_COLUMN_GROUP_ID = 0; \ No newline at end of file diff --git a/internal/core/src/storage/Util.cpp b/internal/core/src/storage/Util.cpp index 71a95df58f..9e1322a172 100644 --- a/internal/core/src/storage/Util.cpp +++ b/internal/core/src/storage/Util.cpp @@ -1041,88 +1041,74 @@ GetFieldDatasFromStorageV2(std::vector>& remote_files, column_group_files[group_id] = remote_chunk_files; } - for (auto& [column_group_id, remote_chunk_files] : column_group_files) { - auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance() - .GetArrowFileSystem(); - // read first file to get path and column offset of the field id - auto file_reader = std::make_shared( - fs, remote_chunk_files[0]); - std::shared_ptr metadata = - file_reader->file_metadata(); + std::vector remote_chunk_files; + if (column_group_files.find(field_id) == column_group_files.end()) { + remote_chunk_files = column_group_files[DEFAULT_SHORT_COLUMN_GROUP_ID]; + } else { + remote_chunk_files = column_group_files[field_id]; + } - auto field_id_mapping = metadata->GetFieldIDMapping(); + auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance() + .GetArrowFileSystem(); + // set up channel for arrow reader + auto field_data_info = FieldDataInfo(); + auto parallel_degree = + static_cast(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE); + field_data_info.arrow_reader_channel->set_capacity(parallel_degree); + + auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::MIDDLE); + + for (auto& column_group_file : remote_chunk_files) { + // get all row groups for each file + std::vector> row_group_lists; + auto reader = std::make_shared( + fs, column_group_file); + auto field_id_mapping = reader->file_metadata()->GetFieldIDMapping(); auto it = field_id_mapping.find(field_id); AssertInfo(it != field_id_mapping.end(), "field id {} not found in field id mapping", field_id); auto column_offset = it->second; - AssertInfo(column_offset.path_index < remote_files.size(), - "column offset path index {} is out of range", - column_offset.path_index); - if (column_offset.path_index != column_group_id) { - LOG_INFO("Skip group id {} since target field shall be in group {}", - column_group_id, - column_offset.path_index); - continue; - } - // set up channel for arrow reader - auto field_data_info = FieldDataInfo(); - auto parallel_degree = static_cast( - DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE); - field_data_info.arrow_reader_channel->set_capacity(parallel_degree); + auto row_group_num = + reader->file_metadata()->GetRowGroupMetadataVector().size(); + std::vector all_row_groups(row_group_num); + std::iota(all_row_groups.begin(), all_row_groups.end(), 0); + row_group_lists.push_back(all_row_groups); - auto& pool = - ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::MIDDLE); + // create a schema with only the field id + auto field_schema = + reader->schema()->field(column_offset.col_index)->Copy(); + auto arrow_schema = arrow::schema({field_schema}); - for (auto& column_group_file : remote_chunk_files) { - // get all row groups for each file - std::vector> row_group_lists; - auto reader = std::make_shared( - fs, column_group_file); - auto row_group_num = - reader->file_metadata()->GetRowGroupMetadataVector().size(); - std::vector all_row_groups(row_group_num); - std::iota(all_row_groups.begin(), all_row_groups.end(), 0); - row_group_lists.push_back(all_row_groups); - - // create a schema with only the field id - auto field_schema = - file_reader->schema()->field(column_offset.col_index)->Copy(); - auto arrow_schema = arrow::schema({field_schema}); - - // split row groups for parallel reading - auto strategy = - std::make_unique( - parallel_degree); - auto load_future = pool.Submit([&]() { - return LoadWithStrategy( - std::vector{column_group_file}, - field_data_info.arrow_reader_channel, - DEFAULT_FIELD_MAX_MEMORY_LIMIT, - std::move(strategy), - row_group_lists, - nullptr); - }); - // read field data from channel - std::shared_ptr r; - while (field_data_info.arrow_reader_channel->pop(r)) { - size_t num_rows = 0; - std::vector> - chunked_arrays; - for (const auto& table : r->arrow_tables) { - num_rows += table->num_rows(); - chunked_arrays.push_back( - table->column(column_offset.col_index)); - } - auto field_data = storage::CreateFieldData( - data_type, field_schema->nullable(), dim, num_rows); - for (const auto& chunked_array : chunked_arrays) { - field_data->FillFieldData(chunked_array); - } - field_data_list.push_back(field_data); + // split row groups for parallel reading + auto strategy = std::make_unique( + parallel_degree); + auto load_future = pool.Submit([&]() { + return LoadWithStrategy(std::vector{column_group_file}, + field_data_info.arrow_reader_channel, + DEFAULT_FIELD_MAX_MEMORY_LIMIT, + std::move(strategy), + row_group_lists, + nullptr); + }); + // read field data from channel + std::shared_ptr r; + while (field_data_info.arrow_reader_channel->pop(r)) { + size_t num_rows = 0; + std::vector> chunked_arrays; + for (const auto& table : r->arrow_tables) { + num_rows += table->num_rows(); + chunked_arrays.push_back( + table->column(column_offset.col_index)); } + auto field_data = storage::CreateFieldData( + data_type, field_schema->nullable(), dim, num_rows); + for (const auto& chunked_array : chunked_arrays) { + field_data->FillFieldData(chunked_array); + } + field_data_list.push_back(field_data); } } return field_data_list; diff --git a/internal/flushcommon/syncmgr/pack_writer_v2.go b/internal/flushcommon/syncmgr/pack_writer_v2.go index 40d149ab4a..bbfc395b9f 100644 --- a/internal/flushcommon/syncmgr/pack_writer_v2.go +++ b/internal/flushcommon/syncmgr/pack_writer_v2.go @@ -118,9 +118,8 @@ func (bw *BulkPackWriterV2) writeInserts(ctx context.Context, pack *SyncPack) (m logs := make(map[int64]*datapb.FieldBinlog) paths := make([]string, 0) - for columnGroup := range columnGroups { - columnGroupID := typeutil.UniqueID(columnGroup) - path := metautil.BuildInsertLogPath(bw.chunkManager.RootPath(), pack.collectionID, pack.partitionID, pack.segmentID, columnGroupID, bw.nextID()) + for _, columnGroup := range columnGroups { + path := metautil.BuildInsertLogPath(bw.chunkManager.RootPath(), pack.collectionID, pack.partitionID, pack.segmentID, columnGroup.GroupID, bw.nextID()) paths = append(paths, path) } tsArray := rec.Column(common.TimeStampField).(*array.Int64) @@ -146,15 +145,15 @@ func (bw *BulkPackWriterV2) writeInserts(ctx context.Context, pack *SyncPack) (m if err = w.Write(rec); err != nil { return nil, err } - for columnGroup := range columnGroups { - columnGroupID := typeutil.UniqueID(columnGroup) + for _, columnGroup := range columnGroups { + columnGroupID := columnGroup.GroupID logs[columnGroupID] = &datapb.FieldBinlog{ FieldID: columnGroupID, Binlogs: []*datapb.Binlog{ { - LogSize: int64(w.GetColumnGroupWrittenUncompressed(columnGroup)), - MemorySize: int64(w.GetColumnGroupWrittenUncompressed(columnGroup)), - LogPath: w.GetWrittenPaths()[columnGroupID], + LogSize: int64(w.GetColumnGroupWrittenUncompressed(columnGroup.GroupID)), + MemorySize: int64(w.GetColumnGroupWrittenUncompressed(columnGroup.GroupID)), + LogPath: w.GetWrittenPaths(columnGroupID), EntriesNum: w.GetWrittenRowNum(), TimestampFrom: tsFrom, TimestampTo: tsTo, @@ -171,7 +170,7 @@ func (bw *BulkPackWriterV2) writeInserts(ctx context.Context, pack *SyncPack) (m // split by row average size func (bw *BulkPackWriterV2) splitInsertData(insertData []*storage.InsertData, splitThresHold int64) ([]storagecommon.ColumnGroup, error) { groups := make([]storagecommon.ColumnGroup, 0) - shortColumnGroup := storagecommon.ColumnGroup{Columns: make([]int, 0)} + shortColumnGroup := storagecommon.ColumnGroup{Columns: make([]int, 0), GroupID: storagecommon.DefaultShortColumnGroupID} memorySizes := make(map[storage.FieldID]int64, len(insertData[0].Data)) rowNums := make(map[storage.FieldID]int64, len(insertData[0].Data)) for _, data := range insertData { @@ -194,15 +193,15 @@ func (bw *BulkPackWriterV2) splitInsertData(insertData []*storage.InsertData, sp } // Check if the field is a vector type if storage.IsVectorDataType(field.DataType) || field.DataType == schemapb.DataType_Text { - groups = append(groups, storagecommon.ColumnGroup{Columns: []int{i}}) + groups = append(groups, storagecommon.ColumnGroup{Columns: []int{i}, GroupID: field.GetFieldID()}) } else if rowNums[field.FieldID] != 0 && memorySizes[field.FieldID]/rowNums[field.FieldID] >= splitThresHold { - groups = append(groups, storagecommon.ColumnGroup{Columns: []int{i}}) + groups = append(groups, storagecommon.ColumnGroup{Columns: []int{i}, GroupID: field.GetFieldID()}) } else { shortColumnGroup.Columns = append(shortColumnGroup.Columns, i) } } if len(shortColumnGroup.Columns) > 0 { - groups = append(groups, shortColumnGroup) + groups = append([]storagecommon.ColumnGroup{shortColumnGroup}, groups...) } return groups, nil } diff --git a/internal/flushcommon/syncmgr/pack_writer_v2_test.go b/internal/flushcommon/syncmgr/pack_writer_v2_test.go index cb7b1d4104..61de0824aa 100644 --- a/internal/flushcommon/syncmgr/pack_writer_v2_test.go +++ b/internal/flushcommon/syncmgr/pack_writer_v2_test.go @@ -127,6 +127,7 @@ func (s *PackWriterV2Suite) TestPackWriterV2_Write() { s.NoError(err) s.Equal(gotInserts[0].Binlogs[0].GetEntriesNum(), int64(rows)) s.Equal(gotInserts[0].Binlogs[0].GetLogPath(), "/tmp/insert_log/123/456/789/0/1") + s.Equal(gotInserts[101].Binlogs[0].GetLogPath(), "/tmp/insert_log/123/456/789/101/2") } func (s *PackWriterV2Suite) TestWriteEmptyInsertData() { diff --git a/internal/storage/rw_test.go b/internal/storage/rw_test.go index 0e475b3ca1..c523f16153 100644 --- a/internal/storage/rw_test.go +++ b/internal/storage/rw_test.go @@ -100,28 +100,37 @@ func (s *PackedBinlogRecordSuite) SetupTest() { s.chunkSize = uint64(1024) } -func genTestColumnGroups(schema *schemapb.CollectionSchema) []storagecommon.ColumnGroup { - fieldBinlogs := make([]*datapb.FieldBinlog, 0) - for i, field := range schema.Fields { - fieldBinlogs = append(fieldBinlogs, &datapb.FieldBinlog{ - FieldID: field.FieldID, - Binlogs: []*datapb.Binlog{ - { - EntriesNum: int64(10 * (i + 1)), - LogSize: int64(1000 / (i + 1)), - }, - }, - }) - } - return storagecommon.SplitByFieldSize(fieldBinlogs, 10) -} - func (s *PackedBinlogRecordSuite) TestPackedBinlogRecordIntegration() { paramtable.Get().Save(paramtable.Get().CommonCfg.StorageType.Key, "local") s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil) rows := 10000 readBatchSize := 1024 - columnGroups := genTestColumnGroups(s.schema) + columnGroups := []storagecommon.ColumnGroup{ + { + GroupID: 0, + Columns: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}, + }, + { + GroupID: 102, + Columns: []int{13}, + }, + { + GroupID: 103, + Columns: []int{14}, + }, + { + GroupID: 104, + Columns: []int{15}, + }, + { + GroupID: 105, + Columns: []int{16}, + }, + { + GroupID: 106, + Columns: []int{17}, + }, + } wOption := []RwOption{ WithUploader(func(ctx context.Context, kvs map[string][]byte) error { return s.mockBinlogIO.Upload(ctx, kvs) @@ -197,7 +206,20 @@ func (s *PackedBinlogRecordSuite) TestPackedBinlogRecordIntegration() { func (s *PackedBinlogRecordSuite) TestGenerateBM25Stats() { s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil) s.schema = genCollectionSchemaWithBM25() - columnGroups := genTestColumnGroups(s.schema) + columnGroups := []storagecommon.ColumnGroup{ + { + GroupID: 0, + Columns: []int{0, 1, 2}, + }, + { + GroupID: 101, + Columns: []int{3}, + }, + { + GroupID: 102, + Columns: []int{4}, + }, + } wOption := []RwOption{ WithUploader(func(ctx context.Context, kvs map[string][]byte) error { return s.mockBinlogIO.Upload(ctx, kvs) @@ -251,7 +273,12 @@ func (s *PackedBinlogRecordSuite) TestNoPrimaryKeyError() { s.schema = &schemapb.CollectionSchema{Fields: []*schemapb.FieldSchema{ {FieldID: 13, Name: "field12", DataType: schemapb.DataType_JSON}, }} - columnGroups := genTestColumnGroups(s.schema) + columnGroups := []storagecommon.ColumnGroup{ + { + GroupID: 0, + Columns: []int{0}, + }, + } wOption := []RwOption{ WithVersion(StorageV2), WithColumnGroups(columnGroups), @@ -264,7 +291,12 @@ func (s *PackedBinlogRecordSuite) TestConvertArrowSchemaError() { s.schema = &schemapb.CollectionSchema{Fields: []*schemapb.FieldSchema{ {FieldID: 14, Name: "field13", DataType: schemapb.DataType_Float16Vector, TypeParams: []*commonpb.KeyValuePair{}}, }} - columnGroups := genTestColumnGroups(s.schema) + columnGroups := []storagecommon.ColumnGroup{ + { + GroupID: 0, + Columns: []int{0}, + }, + } wOption := []RwOption{ WithVersion(StorageV2), WithColumnGroups(columnGroups), @@ -282,7 +314,12 @@ func (s *PackedBinlogRecordSuite) TestEmptyBinlog() { } func (s *PackedBinlogRecordSuite) TestAllocIDExhausedError() { - columnGroups := genTestColumnGroups(s.schema) + columnGroups := []storagecommon.ColumnGroup{ + { + GroupID: 0, + Columns: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17}, + }, + } wOption := []RwOption{ WithVersion(StorageV2), WithColumnGroups(columnGroups), diff --git a/internal/storage/serde_events_v2.go b/internal/storage/serde_events_v2.go index 57baf409ce..cbf4a84148 100644 --- a/internal/storage/serde_events_v2.go +++ b/internal/storage/serde_events_v2.go @@ -141,12 +141,12 @@ type packedRecordWriter struct { bufferSize int64 columnGroups []storagecommon.ColumnGroup bucketName string - paths []string + pathsMap map[typeutil.UniqueID]string schema *schemapb.CollectionSchema arrowSchema *arrow.Schema rowNum int64 writtenUncompressed uint64 - columnGroupUncompressed []uint64 + columnGroupUncompressed map[typeutil.UniqueID]uint64 storageConfig *indexpb.StorageConfig } @@ -166,9 +166,9 @@ func (pw *packedRecordWriter) Write(r Record) error { for col, arr := range rec.Columns() { size := arr.Data().SizeInBytes() pw.writtenUncompressed += size - for columnGroup, group := range pw.columnGroups { - if lo.Contains(group.Columns, col) { - pw.columnGroupUncompressed[columnGroup] += size + for _, columnGroup := range pw.columnGroups { + if lo.Contains(columnGroup.Columns, col) { + pw.columnGroupUncompressed[columnGroup.GroupID] += size break } } @@ -181,12 +181,18 @@ func (pw *packedRecordWriter) GetWrittenUncompressed() uint64 { return pw.writtenUncompressed } -func (pw *packedRecordWriter) GetColumnGroupWrittenUncompressed(columnGroup int) uint64 { - return pw.columnGroupUncompressed[columnGroup] +func (pw *packedRecordWriter) GetColumnGroupWrittenUncompressed(columnGroup typeutil.UniqueID) uint64 { + if size, ok := pw.columnGroupUncompressed[columnGroup]; ok { + return size + } + return 0 } -func (pw *packedRecordWriter) GetWrittenPaths() []string { - return pw.paths +func (pw *packedRecordWriter) GetWrittenPaths(columnGroup typeutil.UniqueID) string { + if path, ok := pw.pathsMap[columnGroup]; ok { + return path + } + return "" } func (pw *packedRecordWriter) GetWrittenRowNum() int64 { @@ -219,14 +225,23 @@ func NewPackedRecordWriter(bucketName string, paths []string, schema *schemapb.C return nil, merr.WrapErrServiceInternal( fmt.Sprintf("can not new packed record writer %s", err.Error())) } - columnGroupUncompressed := make([]uint64, len(columnGroups)) + columnGroupUncompressed := make(map[typeutil.UniqueID]uint64) + pathsMap := make(map[typeutil.UniqueID]string) + if len(paths) != len(columnGroups) { + return nil, merr.WrapErrParameterInvalid(len(paths), len(columnGroups), + "paths length is not equal to column groups length for packed record writer") + } + for i, columnGroup := range columnGroups { + columnGroupUncompressed[columnGroup.GroupID] = 0 + pathsMap[columnGroup.GroupID] = paths[i] + } return &packedRecordWriter{ writer: writer, schema: schema, arrowSchema: arrowSchema, bufferSize: bufferSize, bucketName: bucketName, - paths: paths, + pathsMap: pathsMap, columnGroups: columnGroups, columnGroupUncompressed: columnGroupUncompressed, storageConfig: storageConfig, @@ -236,12 +251,12 @@ func NewPackedRecordWriter(bucketName string, paths []string, schema *schemapb.C func NewPackedSerializeWriter(bucketName string, paths []string, schema *schemapb.CollectionSchema, bufferSize int64, multiPartUploadSize int64, columnGroups []storagecommon.ColumnGroup, batchSize int, ) (*SerializeWriterImpl[*Value], error) { - PackedBinlogRecordWriter, err := NewPackedRecordWriter(bucketName, paths, schema, bufferSize, multiPartUploadSize, columnGroups, nil) + packedRecordWriter, err := NewPackedRecordWriter(bucketName, paths, schema, bufferSize, multiPartUploadSize, columnGroups, nil) if err != nil { return nil, merr.WrapErrServiceInternal( fmt.Sprintf("can not new packed record writer %s", err.Error())) } - return NewSerializeRecordWriter(PackedBinlogRecordWriter, func(v []*Value) (Record, error) { + return NewSerializeRecordWriter(packedRecordWriter, func(v []*Value) (Record, error) { return ValueSerializer(v, schema.Fields) }, batchSize), nil } @@ -332,21 +347,21 @@ func (pw *PackedBinlogRecordWriter) Write(r Record) error { func (pw *PackedBinlogRecordWriter) splitColumnByRecord(r Record) []storagecommon.ColumnGroup { groups := make([]storagecommon.ColumnGroup, 0) - shortColumnGroup := storagecommon.ColumnGroup{Columns: make([]int, 0)} + shortColumnGroup := storagecommon.ColumnGroup{Columns: make([]int, 0), GroupID: storagecommon.DefaultShortColumnGroupID} for i, field := range pw.schema.Fields { arr := r.Column(field.FieldID) size := arr.Data().SizeInBytes() rows := uint64(arr.Len()) if IsVectorDataType(field.DataType) || field.DataType == schemapb.DataType_Text { - groups = append(groups, storagecommon.ColumnGroup{Columns: []int{i}}) + groups = append(groups, storagecommon.ColumnGroup{Columns: []int{i}, GroupID: field.GetFieldID()}) } else if rows != 0 && int64(size/rows) >= packed.ColumnGroupSizeThreshold { - groups = append(groups, storagecommon.ColumnGroup{Columns: []int{i}}) + groups = append(groups, storagecommon.ColumnGroup{Columns: []int{i}, GroupID: field.GetFieldID()}) } else { shortColumnGroup.Columns = append(shortColumnGroup.Columns, i) } } if len(shortColumnGroup.Columns) > 0 { - groups = append(groups, shortColumnGroup) + groups = append([]storagecommon.ColumnGroup{shortColumnGroup}, groups...) } return groups } @@ -361,8 +376,8 @@ func (pw *PackedBinlogRecordWriter) initWriters(r Record) error { return err } paths := []string{} - for columnGroup := range pw.columnGroups { - path := metautil.BuildInsertLogPath(pw.rootPath, pw.collectionID, pw.partitionID, pw.segmentID, typeutil.UniqueID(columnGroup), logIdStart) + for _, columnGroup := range pw.columnGroups { + path := metautil.BuildInsertLogPath(pw.rootPath, pw.collectionID, pw.partitionID, pw.segmentID, columnGroup.GroupID, logIdStart) paths = append(paths, path) logIdStart++ } @@ -401,17 +416,17 @@ func (pw *PackedBinlogRecordWriter) finalizeBinlogs() { if pw.fieldBinlogs == nil { pw.fieldBinlogs = make(map[FieldID]*datapb.FieldBinlog, len(pw.columnGroups)) } - for columnGroup := range pw.columnGroups { - columnGroupID := typeutil.UniqueID(columnGroup) + for _, columnGroup := range pw.columnGroups { + columnGroupID := columnGroup.GroupID if _, exists := pw.fieldBinlogs[columnGroupID]; !exists { pw.fieldBinlogs[columnGroupID] = &datapb.FieldBinlog{ FieldID: columnGroupID, } } pw.fieldBinlogs[columnGroupID].Binlogs = append(pw.fieldBinlogs[columnGroupID].Binlogs, &datapb.Binlog{ - LogSize: int64(pw.writer.columnGroupUncompressed[columnGroup]), // TODO: should provide the log size of each column group file in storage v2 - MemorySize: int64(pw.writer.columnGroupUncompressed[columnGroup]), - LogPath: pw.writer.GetWrittenPaths()[columnGroupID], + LogSize: int64(pw.writer.GetColumnGroupWrittenUncompressed(columnGroupID)), + MemorySize: int64(pw.writer.GetColumnGroupWrittenUncompressed(columnGroupID)), + LogPath: pw.writer.GetWrittenPaths(columnGroupID), EntriesNum: pw.writer.GetWrittenRowNum(), TimestampFrom: pw.tsFrom, TimestampTo: pw.tsTo, diff --git a/internal/storage/serde_events_v2_test.go b/internal/storage/serde_events_v2_test.go index bbcbde6a42..e00f5510d8 100644 --- a/internal/storage/serde_events_v2_test.go +++ b/internal/storage/serde_events_v2_test.go @@ -43,7 +43,7 @@ func TestPackedSerde(t *testing.T) { reader, err := NewBinlogDeserializeReader(generateTestSchema(), MakeBlobsReader(blobs)) assert.NoError(t, err) - group := storagecommon.ColumnGroup{} + group := storagecommon.ColumnGroup{GroupID: storagecommon.DefaultShortColumnGroupID} for i := 0; i < len(schema.Fields); i++ { group.Columns = append(group.Columns, i) } diff --git a/internal/storagecommon/column_group_splitter.go b/internal/storagecommon/column_group_splitter.go index 23f467e876..d2bb1c14a8 100644 --- a/internal/storagecommon/column_group_splitter.go +++ b/internal/storagecommon/column_group_splitter.go @@ -17,33 +17,15 @@ package storagecommon import ( - "github.com/samber/lo" + "github.com/milvus-io/milvus/pkg/v2/util/typeutil" +) - "github.com/milvus-io/milvus/pkg/v2/proto/datapb" +const ( + // column group id for short columns + DefaultShortColumnGroupID = 0 ) type ColumnGroup struct { + GroupID typeutil.UniqueID Columns []int // column indices } - -// split by row average size -func SplitByFieldSize(fieldBinlogs []*datapb.FieldBinlog, splitThresHold int64) []ColumnGroup { - groups := make([]ColumnGroup, 0) - shortColumnGroup := ColumnGroup{Columns: make([]int, 0)} - for i, fieldBinlog := range fieldBinlogs { - if len(fieldBinlog.Binlogs) == 0 { - continue - } - totalSize := lo.SumBy(fieldBinlog.Binlogs, func(b *datapb.Binlog) int64 { return b.LogSize }) - totalNumRows := lo.SumBy(fieldBinlog.Binlogs, func(b *datapb.Binlog) int64 { return b.EntriesNum }) - if totalSize/totalNumRows >= splitThresHold { - groups = append(groups, ColumnGroup{Columns: []int{i}}) - } else { - shortColumnGroup.Columns = append(shortColumnGroup.Columns, i) - } - } - if len(shortColumnGroup.Columns) > 0 { - groups = append(groups, shortColumnGroup) - } - return groups -} diff --git a/internal/storagecommon/column_group_splitter_test.go b/internal/storagecommon/column_group_splitter_test.go deleted file mode 100644 index de1b3491e0..0000000000 --- a/internal/storagecommon/column_group_splitter_test.go +++ /dev/null @@ -1,127 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package storagecommon - -import ( - "testing" - - "github.com/stretchr/testify/assert" - - "github.com/milvus-io/milvus/pkg/v2/proto/datapb" -) - -func TestSplitByFieldSize(t *testing.T) { - tests := []struct { - name string - fieldBinlogs []*datapb.FieldBinlog - splitThresHold int64 - expected []ColumnGroup - }{ - { - name: "Empty input", - fieldBinlogs: []*datapb.FieldBinlog{}, - splitThresHold: 100, - expected: []ColumnGroup{}, - }, - { - name: "Empty binlogs", - fieldBinlogs: []*datapb.FieldBinlog{{FieldID: 0, Binlogs: []*datapb.Binlog{}}}, - splitThresHold: 100, - expected: []ColumnGroup{}, - }, - { - name: "above threshold", - fieldBinlogs: []*datapb.FieldBinlog{ - { - FieldID: 0, - Binlogs: []*datapb.Binlog{ - {LogSize: 1000, EntriesNum: 10}, - }, - }, - { - FieldID: 1, - Binlogs: []*datapb.Binlog{ - {LogSize: 2000, EntriesNum: 10}, - }, - }, - }, - splitThresHold: 50, - expected: []ColumnGroup{ - {Columns: []int{0}}, - {Columns: []int{1}}, - }, - }, - { - name: "one field", - fieldBinlogs: []*datapb.FieldBinlog{ - { - FieldID: 0, - Binlogs: []*datapb.Binlog{ - {LogSize: 100, EntriesNum: 10}, - }, - }, - }, - splitThresHold: 50, - expected: []ColumnGroup{ - {Columns: []int{0}}, - }, - }, - { - name: "Multiple fields, mixed sizes", - fieldBinlogs: []*datapb.FieldBinlog{ - { - FieldID: 0, - Binlogs: []*datapb.Binlog{ // (above) - {LogSize: 500, EntriesNum: 5}, - {LogSize: 500, EntriesNum: 5}, - }, - }, - { - FieldID: 1, - Binlogs: []*datapb.Binlog{ - {LogSize: 200, EntriesNum: 20}, // (below) - }, - }, - { - FieldID: 2, - Binlogs: []*datapb.Binlog{ - {LogSize: 500, EntriesNum: 10}, // (threshold) - }, - }, - { - FieldID: 3, - Binlogs: []*datapb.Binlog{ - {LogSize: 400, EntriesNum: 10}, // (below) - }, - }, - }, - splitThresHold: 50, - expected: []ColumnGroup{ - {Columns: []int{0}}, - {Columns: []int{2}}, - {Columns: []int{1, 3}}, - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - result := SplitByFieldSize(tt.fieldBinlogs, tt.splitThresHold) - assert.Equal(t, tt.expected, result) - }) - } -} diff --git a/internal/storagev2/packed/packed_test.go b/internal/storagev2/packed/packed_test.go index e5127b7902..547c531af2 100644 --- a/internal/storagev2/packed/packed_test.go +++ b/internal/storagev2/packed/packed_test.go @@ -79,7 +79,7 @@ func (suite *PackedTestSuite) TestPackedOneFile() { batches := 100 paths := []string{"/tmp/100"} - columnGroups := []storagecommon.ColumnGroup{{Columns: []int{0, 1, 2}}} + columnGroups := []storagecommon.ColumnGroup{{Columns: []int{0, 1, 2}, GroupID: storagecommon.DefaultShortColumnGroupID}} bufferSize := int64(10 * 1024 * 1024) // 10MB multiPartUploadSize := int64(0) pw, err := NewPackedWriter(paths, suite.schema, bufferSize, multiPartUploadSize, columnGroups, nil) @@ -131,7 +131,7 @@ func (suite *PackedTestSuite) TestPackedMultiFiles() { rec := b.NewRecord() defer rec.Release() paths := []string{"/tmp/100", "/tmp/101"} - columnGroups := []storagecommon.ColumnGroup{{Columns: []int{2}}, {Columns: []int{0, 1}}} + columnGroups := []storagecommon.ColumnGroup{{Columns: []int{2}, GroupID: 2}, {Columns: []int{0, 1}, GroupID: storagecommon.DefaultShortColumnGroupID}} bufferSize := int64(-1) // unlimited multiPartUploadSize := int64(0) pw, err := NewPackedWriter(paths, suite.schema, bufferSize, multiPartUploadSize, columnGroups, nil)