enhance: [StorageV2] field id as meta path for wide column (#42787)

related: #39173

Signed-off-by: shaoting-huang <shaoting.huang@zilliz.com>
This commit is contained in:
sthuang 2025-06-19 15:00:38 +08:00 committed by GitHub
parent 4ba177cd2c
commit 4a0a2441f2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 178 additions and 283 deletions

View File

@ -102,4 +102,6 @@ const std::string INDEX_NUM_ROWS_KEY = "index_num_rows";
const int64_t STORAGE_V1 = 1;
const int64_t STORAGE_V2 = 2;
const std::string UNKNOW_CAST_FUNCTION_NAME = "unknown";
const std::string UNKNOW_CAST_FUNCTION_NAME = "unknown";
const int64_t DEFAULT_SHORT_COLUMN_GROUP_ID = 0;

View File

@ -1041,88 +1041,74 @@ GetFieldDatasFromStorageV2(std::vector<std::vector<std::string>>& remote_files,
column_group_files[group_id] = remote_chunk_files;
}
for (auto& [column_group_id, remote_chunk_files] : column_group_files) {
auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance()
.GetArrowFileSystem();
// read first file to get path and column offset of the field id
auto file_reader = std::make_shared<milvus_storage::FileRowGroupReader>(
fs, remote_chunk_files[0]);
std::shared_ptr<milvus_storage::PackedFileMetadata> metadata =
file_reader->file_metadata();
std::vector<std::string> remote_chunk_files;
if (column_group_files.find(field_id) == column_group_files.end()) {
remote_chunk_files = column_group_files[DEFAULT_SHORT_COLUMN_GROUP_ID];
} else {
remote_chunk_files = column_group_files[field_id];
}
auto field_id_mapping = metadata->GetFieldIDMapping();
auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance()
.GetArrowFileSystem();
// set up channel for arrow reader
auto field_data_info = FieldDataInfo();
auto parallel_degree =
static_cast<uint64_t>(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE);
field_data_info.arrow_reader_channel->set_capacity(parallel_degree);
auto& pool = ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::MIDDLE);
for (auto& column_group_file : remote_chunk_files) {
// get all row groups for each file
std::vector<std::vector<int64_t>> row_group_lists;
auto reader = std::make_shared<milvus_storage::FileRowGroupReader>(
fs, column_group_file);
auto field_id_mapping = reader->file_metadata()->GetFieldIDMapping();
auto it = field_id_mapping.find(field_id);
AssertInfo(it != field_id_mapping.end(),
"field id {} not found in field id mapping",
field_id);
auto column_offset = it->second;
AssertInfo(column_offset.path_index < remote_files.size(),
"column offset path index {} is out of range",
column_offset.path_index);
if (column_offset.path_index != column_group_id) {
LOG_INFO("Skip group id {} since target field shall be in group {}",
column_group_id,
column_offset.path_index);
continue;
}
// set up channel for arrow reader
auto field_data_info = FieldDataInfo();
auto parallel_degree = static_cast<uint64_t>(
DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE);
field_data_info.arrow_reader_channel->set_capacity(parallel_degree);
auto row_group_num =
reader->file_metadata()->GetRowGroupMetadataVector().size();
std::vector<int64_t> all_row_groups(row_group_num);
std::iota(all_row_groups.begin(), all_row_groups.end(), 0);
row_group_lists.push_back(all_row_groups);
auto& pool =
ThreadPools::GetThreadPool(milvus::ThreadPoolPriority::MIDDLE);
// create a schema with only the field id
auto field_schema =
reader->schema()->field(column_offset.col_index)->Copy();
auto arrow_schema = arrow::schema({field_schema});
for (auto& column_group_file : remote_chunk_files) {
// get all row groups for each file
std::vector<std::vector<int64_t>> row_group_lists;
auto reader = std::make_shared<milvus_storage::FileRowGroupReader>(
fs, column_group_file);
auto row_group_num =
reader->file_metadata()->GetRowGroupMetadataVector().size();
std::vector<int64_t> all_row_groups(row_group_num);
std::iota(all_row_groups.begin(), all_row_groups.end(), 0);
row_group_lists.push_back(all_row_groups);
// create a schema with only the field id
auto field_schema =
file_reader->schema()->field(column_offset.col_index)->Copy();
auto arrow_schema = arrow::schema({field_schema});
// split row groups for parallel reading
auto strategy =
std::make_unique<segcore::ParallelDegreeSplitStrategy>(
parallel_degree);
auto load_future = pool.Submit([&]() {
return LoadWithStrategy(
std::vector<std::string>{column_group_file},
field_data_info.arrow_reader_channel,
DEFAULT_FIELD_MAX_MEMORY_LIMIT,
std::move(strategy),
row_group_lists,
nullptr);
});
// read field data from channel
std::shared_ptr<milvus::ArrowDataWrapper> r;
while (field_data_info.arrow_reader_channel->pop(r)) {
size_t num_rows = 0;
std::vector<std::shared_ptr<arrow::ChunkedArray>>
chunked_arrays;
for (const auto& table : r->arrow_tables) {
num_rows += table->num_rows();
chunked_arrays.push_back(
table->column(column_offset.col_index));
}
auto field_data = storage::CreateFieldData(
data_type, field_schema->nullable(), dim, num_rows);
for (const auto& chunked_array : chunked_arrays) {
field_data->FillFieldData(chunked_array);
}
field_data_list.push_back(field_data);
// split row groups for parallel reading
auto strategy = std::make_unique<segcore::ParallelDegreeSplitStrategy>(
parallel_degree);
auto load_future = pool.Submit([&]() {
return LoadWithStrategy(std::vector<std::string>{column_group_file},
field_data_info.arrow_reader_channel,
DEFAULT_FIELD_MAX_MEMORY_LIMIT,
std::move(strategy),
row_group_lists,
nullptr);
});
// read field data from channel
std::shared_ptr<milvus::ArrowDataWrapper> r;
while (field_data_info.arrow_reader_channel->pop(r)) {
size_t num_rows = 0;
std::vector<std::shared_ptr<arrow::ChunkedArray>> chunked_arrays;
for (const auto& table : r->arrow_tables) {
num_rows += table->num_rows();
chunked_arrays.push_back(
table->column(column_offset.col_index));
}
auto field_data = storage::CreateFieldData(
data_type, field_schema->nullable(), dim, num_rows);
for (const auto& chunked_array : chunked_arrays) {
field_data->FillFieldData(chunked_array);
}
field_data_list.push_back(field_data);
}
}
return field_data_list;

View File

@ -118,9 +118,8 @@ func (bw *BulkPackWriterV2) writeInserts(ctx context.Context, pack *SyncPack) (m
logs := make(map[int64]*datapb.FieldBinlog)
paths := make([]string, 0)
for columnGroup := range columnGroups {
columnGroupID := typeutil.UniqueID(columnGroup)
path := metautil.BuildInsertLogPath(bw.chunkManager.RootPath(), pack.collectionID, pack.partitionID, pack.segmentID, columnGroupID, bw.nextID())
for _, columnGroup := range columnGroups {
path := metautil.BuildInsertLogPath(bw.chunkManager.RootPath(), pack.collectionID, pack.partitionID, pack.segmentID, columnGroup.GroupID, bw.nextID())
paths = append(paths, path)
}
tsArray := rec.Column(common.TimeStampField).(*array.Int64)
@ -146,15 +145,15 @@ func (bw *BulkPackWriterV2) writeInserts(ctx context.Context, pack *SyncPack) (m
if err = w.Write(rec); err != nil {
return nil, err
}
for columnGroup := range columnGroups {
columnGroupID := typeutil.UniqueID(columnGroup)
for _, columnGroup := range columnGroups {
columnGroupID := columnGroup.GroupID
logs[columnGroupID] = &datapb.FieldBinlog{
FieldID: columnGroupID,
Binlogs: []*datapb.Binlog{
{
LogSize: int64(w.GetColumnGroupWrittenUncompressed(columnGroup)),
MemorySize: int64(w.GetColumnGroupWrittenUncompressed(columnGroup)),
LogPath: w.GetWrittenPaths()[columnGroupID],
LogSize: int64(w.GetColumnGroupWrittenUncompressed(columnGroup.GroupID)),
MemorySize: int64(w.GetColumnGroupWrittenUncompressed(columnGroup.GroupID)),
LogPath: w.GetWrittenPaths(columnGroupID),
EntriesNum: w.GetWrittenRowNum(),
TimestampFrom: tsFrom,
TimestampTo: tsTo,
@ -171,7 +170,7 @@ func (bw *BulkPackWriterV2) writeInserts(ctx context.Context, pack *SyncPack) (m
// split by row average size
func (bw *BulkPackWriterV2) splitInsertData(insertData []*storage.InsertData, splitThresHold int64) ([]storagecommon.ColumnGroup, error) {
groups := make([]storagecommon.ColumnGroup, 0)
shortColumnGroup := storagecommon.ColumnGroup{Columns: make([]int, 0)}
shortColumnGroup := storagecommon.ColumnGroup{Columns: make([]int, 0), GroupID: storagecommon.DefaultShortColumnGroupID}
memorySizes := make(map[storage.FieldID]int64, len(insertData[0].Data))
rowNums := make(map[storage.FieldID]int64, len(insertData[0].Data))
for _, data := range insertData {
@ -194,15 +193,15 @@ func (bw *BulkPackWriterV2) splitInsertData(insertData []*storage.InsertData, sp
}
// Check if the field is a vector type
if storage.IsVectorDataType(field.DataType) || field.DataType == schemapb.DataType_Text {
groups = append(groups, storagecommon.ColumnGroup{Columns: []int{i}})
groups = append(groups, storagecommon.ColumnGroup{Columns: []int{i}, GroupID: field.GetFieldID()})
} else if rowNums[field.FieldID] != 0 && memorySizes[field.FieldID]/rowNums[field.FieldID] >= splitThresHold {
groups = append(groups, storagecommon.ColumnGroup{Columns: []int{i}})
groups = append(groups, storagecommon.ColumnGroup{Columns: []int{i}, GroupID: field.GetFieldID()})
} else {
shortColumnGroup.Columns = append(shortColumnGroup.Columns, i)
}
}
if len(shortColumnGroup.Columns) > 0 {
groups = append(groups, shortColumnGroup)
groups = append([]storagecommon.ColumnGroup{shortColumnGroup}, groups...)
}
return groups, nil
}

View File

@ -127,6 +127,7 @@ func (s *PackWriterV2Suite) TestPackWriterV2_Write() {
s.NoError(err)
s.Equal(gotInserts[0].Binlogs[0].GetEntriesNum(), int64(rows))
s.Equal(gotInserts[0].Binlogs[0].GetLogPath(), "/tmp/insert_log/123/456/789/0/1")
s.Equal(gotInserts[101].Binlogs[0].GetLogPath(), "/tmp/insert_log/123/456/789/101/2")
}
func (s *PackWriterV2Suite) TestWriteEmptyInsertData() {

View File

@ -100,28 +100,37 @@ func (s *PackedBinlogRecordSuite) SetupTest() {
s.chunkSize = uint64(1024)
}
func genTestColumnGroups(schema *schemapb.CollectionSchema) []storagecommon.ColumnGroup {
fieldBinlogs := make([]*datapb.FieldBinlog, 0)
for i, field := range schema.Fields {
fieldBinlogs = append(fieldBinlogs, &datapb.FieldBinlog{
FieldID: field.FieldID,
Binlogs: []*datapb.Binlog{
{
EntriesNum: int64(10 * (i + 1)),
LogSize: int64(1000 / (i + 1)),
},
},
})
}
return storagecommon.SplitByFieldSize(fieldBinlogs, 10)
}
func (s *PackedBinlogRecordSuite) TestPackedBinlogRecordIntegration() {
paramtable.Get().Save(paramtable.Get().CommonCfg.StorageType.Key, "local")
s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil)
rows := 10000
readBatchSize := 1024
columnGroups := genTestColumnGroups(s.schema)
columnGroups := []storagecommon.ColumnGroup{
{
GroupID: 0,
Columns: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12},
},
{
GroupID: 102,
Columns: []int{13},
},
{
GroupID: 103,
Columns: []int{14},
},
{
GroupID: 104,
Columns: []int{15},
},
{
GroupID: 105,
Columns: []int{16},
},
{
GroupID: 106,
Columns: []int{17},
},
}
wOption := []RwOption{
WithUploader(func(ctx context.Context, kvs map[string][]byte) error {
return s.mockBinlogIO.Upload(ctx, kvs)
@ -197,7 +206,20 @@ func (s *PackedBinlogRecordSuite) TestPackedBinlogRecordIntegration() {
func (s *PackedBinlogRecordSuite) TestGenerateBM25Stats() {
s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil)
s.schema = genCollectionSchemaWithBM25()
columnGroups := genTestColumnGroups(s.schema)
columnGroups := []storagecommon.ColumnGroup{
{
GroupID: 0,
Columns: []int{0, 1, 2},
},
{
GroupID: 101,
Columns: []int{3},
},
{
GroupID: 102,
Columns: []int{4},
},
}
wOption := []RwOption{
WithUploader(func(ctx context.Context, kvs map[string][]byte) error {
return s.mockBinlogIO.Upload(ctx, kvs)
@ -251,7 +273,12 @@ func (s *PackedBinlogRecordSuite) TestNoPrimaryKeyError() {
s.schema = &schemapb.CollectionSchema{Fields: []*schemapb.FieldSchema{
{FieldID: 13, Name: "field12", DataType: schemapb.DataType_JSON},
}}
columnGroups := genTestColumnGroups(s.schema)
columnGroups := []storagecommon.ColumnGroup{
{
GroupID: 0,
Columns: []int{0},
},
}
wOption := []RwOption{
WithVersion(StorageV2),
WithColumnGroups(columnGroups),
@ -264,7 +291,12 @@ func (s *PackedBinlogRecordSuite) TestConvertArrowSchemaError() {
s.schema = &schemapb.CollectionSchema{Fields: []*schemapb.FieldSchema{
{FieldID: 14, Name: "field13", DataType: schemapb.DataType_Float16Vector, TypeParams: []*commonpb.KeyValuePair{}},
}}
columnGroups := genTestColumnGroups(s.schema)
columnGroups := []storagecommon.ColumnGroup{
{
GroupID: 0,
Columns: []int{0},
},
}
wOption := []RwOption{
WithVersion(StorageV2),
WithColumnGroups(columnGroups),
@ -282,7 +314,12 @@ func (s *PackedBinlogRecordSuite) TestEmptyBinlog() {
}
func (s *PackedBinlogRecordSuite) TestAllocIDExhausedError() {
columnGroups := genTestColumnGroups(s.schema)
columnGroups := []storagecommon.ColumnGroup{
{
GroupID: 0,
Columns: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17},
},
}
wOption := []RwOption{
WithVersion(StorageV2),
WithColumnGroups(columnGroups),

View File

@ -141,12 +141,12 @@ type packedRecordWriter struct {
bufferSize int64
columnGroups []storagecommon.ColumnGroup
bucketName string
paths []string
pathsMap map[typeutil.UniqueID]string
schema *schemapb.CollectionSchema
arrowSchema *arrow.Schema
rowNum int64
writtenUncompressed uint64
columnGroupUncompressed []uint64
columnGroupUncompressed map[typeutil.UniqueID]uint64
storageConfig *indexpb.StorageConfig
}
@ -166,9 +166,9 @@ func (pw *packedRecordWriter) Write(r Record) error {
for col, arr := range rec.Columns() {
size := arr.Data().SizeInBytes()
pw.writtenUncompressed += size
for columnGroup, group := range pw.columnGroups {
if lo.Contains(group.Columns, col) {
pw.columnGroupUncompressed[columnGroup] += size
for _, columnGroup := range pw.columnGroups {
if lo.Contains(columnGroup.Columns, col) {
pw.columnGroupUncompressed[columnGroup.GroupID] += size
break
}
}
@ -181,12 +181,18 @@ func (pw *packedRecordWriter) GetWrittenUncompressed() uint64 {
return pw.writtenUncompressed
}
func (pw *packedRecordWriter) GetColumnGroupWrittenUncompressed(columnGroup int) uint64 {
return pw.columnGroupUncompressed[columnGroup]
func (pw *packedRecordWriter) GetColumnGroupWrittenUncompressed(columnGroup typeutil.UniqueID) uint64 {
if size, ok := pw.columnGroupUncompressed[columnGroup]; ok {
return size
}
return 0
}
func (pw *packedRecordWriter) GetWrittenPaths() []string {
return pw.paths
func (pw *packedRecordWriter) GetWrittenPaths(columnGroup typeutil.UniqueID) string {
if path, ok := pw.pathsMap[columnGroup]; ok {
return path
}
return ""
}
func (pw *packedRecordWriter) GetWrittenRowNum() int64 {
@ -219,14 +225,23 @@ func NewPackedRecordWriter(bucketName string, paths []string, schema *schemapb.C
return nil, merr.WrapErrServiceInternal(
fmt.Sprintf("can not new packed record writer %s", err.Error()))
}
columnGroupUncompressed := make([]uint64, len(columnGroups))
columnGroupUncompressed := make(map[typeutil.UniqueID]uint64)
pathsMap := make(map[typeutil.UniqueID]string)
if len(paths) != len(columnGroups) {
return nil, merr.WrapErrParameterInvalid(len(paths), len(columnGroups),
"paths length is not equal to column groups length for packed record writer")
}
for i, columnGroup := range columnGroups {
columnGroupUncompressed[columnGroup.GroupID] = 0
pathsMap[columnGroup.GroupID] = paths[i]
}
return &packedRecordWriter{
writer: writer,
schema: schema,
arrowSchema: arrowSchema,
bufferSize: bufferSize,
bucketName: bucketName,
paths: paths,
pathsMap: pathsMap,
columnGroups: columnGroups,
columnGroupUncompressed: columnGroupUncompressed,
storageConfig: storageConfig,
@ -236,12 +251,12 @@ func NewPackedRecordWriter(bucketName string, paths []string, schema *schemapb.C
func NewPackedSerializeWriter(bucketName string, paths []string, schema *schemapb.CollectionSchema, bufferSize int64,
multiPartUploadSize int64, columnGroups []storagecommon.ColumnGroup, batchSize int,
) (*SerializeWriterImpl[*Value], error) {
PackedBinlogRecordWriter, err := NewPackedRecordWriter(bucketName, paths, schema, bufferSize, multiPartUploadSize, columnGroups, nil)
packedRecordWriter, err := NewPackedRecordWriter(bucketName, paths, schema, bufferSize, multiPartUploadSize, columnGroups, nil)
if err != nil {
return nil, merr.WrapErrServiceInternal(
fmt.Sprintf("can not new packed record writer %s", err.Error()))
}
return NewSerializeRecordWriter(PackedBinlogRecordWriter, func(v []*Value) (Record, error) {
return NewSerializeRecordWriter(packedRecordWriter, func(v []*Value) (Record, error) {
return ValueSerializer(v, schema.Fields)
}, batchSize), nil
}
@ -332,21 +347,21 @@ func (pw *PackedBinlogRecordWriter) Write(r Record) error {
func (pw *PackedBinlogRecordWriter) splitColumnByRecord(r Record) []storagecommon.ColumnGroup {
groups := make([]storagecommon.ColumnGroup, 0)
shortColumnGroup := storagecommon.ColumnGroup{Columns: make([]int, 0)}
shortColumnGroup := storagecommon.ColumnGroup{Columns: make([]int, 0), GroupID: storagecommon.DefaultShortColumnGroupID}
for i, field := range pw.schema.Fields {
arr := r.Column(field.FieldID)
size := arr.Data().SizeInBytes()
rows := uint64(arr.Len())
if IsVectorDataType(field.DataType) || field.DataType == schemapb.DataType_Text {
groups = append(groups, storagecommon.ColumnGroup{Columns: []int{i}})
groups = append(groups, storagecommon.ColumnGroup{Columns: []int{i}, GroupID: field.GetFieldID()})
} else if rows != 0 && int64(size/rows) >= packed.ColumnGroupSizeThreshold {
groups = append(groups, storagecommon.ColumnGroup{Columns: []int{i}})
groups = append(groups, storagecommon.ColumnGroup{Columns: []int{i}, GroupID: field.GetFieldID()})
} else {
shortColumnGroup.Columns = append(shortColumnGroup.Columns, i)
}
}
if len(shortColumnGroup.Columns) > 0 {
groups = append(groups, shortColumnGroup)
groups = append([]storagecommon.ColumnGroup{shortColumnGroup}, groups...)
}
return groups
}
@ -361,8 +376,8 @@ func (pw *PackedBinlogRecordWriter) initWriters(r Record) error {
return err
}
paths := []string{}
for columnGroup := range pw.columnGroups {
path := metautil.BuildInsertLogPath(pw.rootPath, pw.collectionID, pw.partitionID, pw.segmentID, typeutil.UniqueID(columnGroup), logIdStart)
for _, columnGroup := range pw.columnGroups {
path := metautil.BuildInsertLogPath(pw.rootPath, pw.collectionID, pw.partitionID, pw.segmentID, columnGroup.GroupID, logIdStart)
paths = append(paths, path)
logIdStart++
}
@ -401,17 +416,17 @@ func (pw *PackedBinlogRecordWriter) finalizeBinlogs() {
if pw.fieldBinlogs == nil {
pw.fieldBinlogs = make(map[FieldID]*datapb.FieldBinlog, len(pw.columnGroups))
}
for columnGroup := range pw.columnGroups {
columnGroupID := typeutil.UniqueID(columnGroup)
for _, columnGroup := range pw.columnGroups {
columnGroupID := columnGroup.GroupID
if _, exists := pw.fieldBinlogs[columnGroupID]; !exists {
pw.fieldBinlogs[columnGroupID] = &datapb.FieldBinlog{
FieldID: columnGroupID,
}
}
pw.fieldBinlogs[columnGroupID].Binlogs = append(pw.fieldBinlogs[columnGroupID].Binlogs, &datapb.Binlog{
LogSize: int64(pw.writer.columnGroupUncompressed[columnGroup]), // TODO: should provide the log size of each column group file in storage v2
MemorySize: int64(pw.writer.columnGroupUncompressed[columnGroup]),
LogPath: pw.writer.GetWrittenPaths()[columnGroupID],
LogSize: int64(pw.writer.GetColumnGroupWrittenUncompressed(columnGroupID)),
MemorySize: int64(pw.writer.GetColumnGroupWrittenUncompressed(columnGroupID)),
LogPath: pw.writer.GetWrittenPaths(columnGroupID),
EntriesNum: pw.writer.GetWrittenRowNum(),
TimestampFrom: pw.tsFrom,
TimestampTo: pw.tsTo,

View File

@ -43,7 +43,7 @@ func TestPackedSerde(t *testing.T) {
reader, err := NewBinlogDeserializeReader(generateTestSchema(), MakeBlobsReader(blobs))
assert.NoError(t, err)
group := storagecommon.ColumnGroup{}
group := storagecommon.ColumnGroup{GroupID: storagecommon.DefaultShortColumnGroupID}
for i := 0; i < len(schema.Fields); i++ {
group.Columns = append(group.Columns, i)
}

View File

@ -17,33 +17,15 @@
package storagecommon
import (
"github.com/samber/lo"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
const (
// column group id for short columns
DefaultShortColumnGroupID = 0
)
type ColumnGroup struct {
GroupID typeutil.UniqueID
Columns []int // column indices
}
// split by row average size
func SplitByFieldSize(fieldBinlogs []*datapb.FieldBinlog, splitThresHold int64) []ColumnGroup {
groups := make([]ColumnGroup, 0)
shortColumnGroup := ColumnGroup{Columns: make([]int, 0)}
for i, fieldBinlog := range fieldBinlogs {
if len(fieldBinlog.Binlogs) == 0 {
continue
}
totalSize := lo.SumBy(fieldBinlog.Binlogs, func(b *datapb.Binlog) int64 { return b.LogSize })
totalNumRows := lo.SumBy(fieldBinlog.Binlogs, func(b *datapb.Binlog) int64 { return b.EntriesNum })
if totalSize/totalNumRows >= splitThresHold {
groups = append(groups, ColumnGroup{Columns: []int{i}})
} else {
shortColumnGroup.Columns = append(shortColumnGroup.Columns, i)
}
}
if len(shortColumnGroup.Columns) > 0 {
groups = append(groups, shortColumnGroup)
}
return groups
}

View File

@ -1,127 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storagecommon
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
)
func TestSplitByFieldSize(t *testing.T) {
tests := []struct {
name string
fieldBinlogs []*datapb.FieldBinlog
splitThresHold int64
expected []ColumnGroup
}{
{
name: "Empty input",
fieldBinlogs: []*datapb.FieldBinlog{},
splitThresHold: 100,
expected: []ColumnGroup{},
},
{
name: "Empty binlogs",
fieldBinlogs: []*datapb.FieldBinlog{{FieldID: 0, Binlogs: []*datapb.Binlog{}}},
splitThresHold: 100,
expected: []ColumnGroup{},
},
{
name: "above threshold",
fieldBinlogs: []*datapb.FieldBinlog{
{
FieldID: 0,
Binlogs: []*datapb.Binlog{
{LogSize: 1000, EntriesNum: 10},
},
},
{
FieldID: 1,
Binlogs: []*datapb.Binlog{
{LogSize: 2000, EntriesNum: 10},
},
},
},
splitThresHold: 50,
expected: []ColumnGroup{
{Columns: []int{0}},
{Columns: []int{1}},
},
},
{
name: "one field",
fieldBinlogs: []*datapb.FieldBinlog{
{
FieldID: 0,
Binlogs: []*datapb.Binlog{
{LogSize: 100, EntriesNum: 10},
},
},
},
splitThresHold: 50,
expected: []ColumnGroup{
{Columns: []int{0}},
},
},
{
name: "Multiple fields, mixed sizes",
fieldBinlogs: []*datapb.FieldBinlog{
{
FieldID: 0,
Binlogs: []*datapb.Binlog{ // (above)
{LogSize: 500, EntriesNum: 5},
{LogSize: 500, EntriesNum: 5},
},
},
{
FieldID: 1,
Binlogs: []*datapb.Binlog{
{LogSize: 200, EntriesNum: 20}, // (below)
},
},
{
FieldID: 2,
Binlogs: []*datapb.Binlog{
{LogSize: 500, EntriesNum: 10}, // (threshold)
},
},
{
FieldID: 3,
Binlogs: []*datapb.Binlog{
{LogSize: 400, EntriesNum: 10}, // (below)
},
},
},
splitThresHold: 50,
expected: []ColumnGroup{
{Columns: []int{0}},
{Columns: []int{2}},
{Columns: []int{1, 3}},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := SplitByFieldSize(tt.fieldBinlogs, tt.splitThresHold)
assert.Equal(t, tt.expected, result)
})
}
}

View File

@ -79,7 +79,7 @@ func (suite *PackedTestSuite) TestPackedOneFile() {
batches := 100
paths := []string{"/tmp/100"}
columnGroups := []storagecommon.ColumnGroup{{Columns: []int{0, 1, 2}}}
columnGroups := []storagecommon.ColumnGroup{{Columns: []int{0, 1, 2}, GroupID: storagecommon.DefaultShortColumnGroupID}}
bufferSize := int64(10 * 1024 * 1024) // 10MB
multiPartUploadSize := int64(0)
pw, err := NewPackedWriter(paths, suite.schema, bufferSize, multiPartUploadSize, columnGroups, nil)
@ -131,7 +131,7 @@ func (suite *PackedTestSuite) TestPackedMultiFiles() {
rec := b.NewRecord()
defer rec.Release()
paths := []string{"/tmp/100", "/tmp/101"}
columnGroups := []storagecommon.ColumnGroup{{Columns: []int{2}}, {Columns: []int{0, 1}}}
columnGroups := []storagecommon.ColumnGroup{{Columns: []int{2}, GroupID: 2}, {Columns: []int{0, 1}, GroupID: storagecommon.DefaultShortColumnGroupID}}
bufferSize := int64(-1) // unlimited
multiPartUploadSize := int64(0)
pw, err := NewPackedWriter(paths, suite.schema, bufferSize, multiPartUploadSize, columnGroups, nil)