fix: [StorageV2] end to end minor issues for sync, stats, and load (#42948)

Fix issues in end-to-end tests: 
1. **Split column groups based on schema**, rather than estimating by
average chunk row size. **Ensure column group consistency within a
segment**, to avoid errors caused by loading multiple column group
chunks simultaneously.
2. **Use sorted segmentId** when generating the stats binlog path, to
ensure consistent and correct file path resolution.
3. **Determine field IDs as follows**:
For multi-column column groups, retrieve the field ID list from
metadata.
For single-column column groups, use the column group ID directly as the
field ID.

related: #39173 
fix: #42862

---------

Signed-off-by: shaoting-huang <shaoting.huang@zilliz.com>
This commit is contained in:
sthuang 2025-06-27 14:44:42 +08:00 committed by GitHub
parent 94e2b05ffd
commit 238bd30f42
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 131 additions and 155 deletions

View File

@ -48,6 +48,7 @@
#include "index/IndexFactory.h"
#include "index/JsonFlatIndex.h"
#include "index/VectorMemIndex.h"
#include "milvus-storage/common/metadata.h"
#include "mmap/ChunkedColumn.h"
#include "mmap/Types.h"
#include "monitor/prometheus_client.h"
@ -174,7 +175,8 @@ ChunkedSegmentSealedImpl::LoadScalarIndex(const LoadIndexInfo& info) {
index.nested_path = path;
index.field_id = field_id;
index.index = std::move(const_cast<LoadIndexInfo&>(info).cache_index);
index.cast_type = JsonCastType::FromString(info.index_params.at(JSON_CAST_TYPE));
index.cast_type =
JsonCastType::FromString(info.index_params.at(JSON_CAST_TYPE));
json_indices.push_back(std::move(index));
return;
}
@ -250,8 +252,6 @@ ChunkedSegmentSealedImpl::load_column_group_data_internal(
size_t num_rows = storage::GetNumRowsForLoadInfo(load_info);
ArrowSchemaPtr arrow_schema = schema_->ConvertToArrowSchema();
auto field_ids = schema_->get_field_ids();
for (auto& [id, info] : load_info.field_infos) {
AssertInfo(info.row_count > 0, "The row count of field data is 0");
@ -261,26 +261,29 @@ ChunkedSegmentSealedImpl::load_column_group_data_internal(
auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance()
.GetArrowFileSystem();
milvus_storage::FieldIDList field_id_list = storage::GetFieldIDList(
column_group_id, insert_files[0], arrow_schema, fs);
std::vector<milvus_storage::RowGroupMetadataVector> row_group_meta_list;
for (const auto& file : insert_files) {
auto reader =
std::make_shared<milvus_storage::FileRowGroupReader>(fs, file);
row_group_meta_list.push_back(
reader->file_metadata()->GetRowGroupMetadataVector());
}
std::vector<FieldId> milvus_field_ids;
if (column_group_id == FieldId(DEFAULT_SHORT_COLUMN_GROUP_ID)) {
milvus_field_ids = narrow_column_field_ids_;
} else {
milvus_field_ids.push_back(column_group_id);
auto status = reader->Close();
AssertInfo(status.ok(),
"failed to close file reader when get row group "
"metadata from file: " +
file + " with error: " + status.ToString());
}
// if multiple fields share same column group
// hint for not loading certain field shall not be working for now
// warmup will be disabled only when all columns are not in load list
bool merged_in_load_list = false;
for (int i = 0; i < milvus_field_ids.size(); ++i) {
std::vector<FieldId> milvus_field_ids;
for (int i = 0; i < field_id_list.size(); ++i) {
milvus_field_ids.push_back(FieldId(field_id_list.Get(i)));
merged_in_load_list = merged_in_load_list ||
schema_->ShallLoadField(milvus_field_ids[i]);
}
@ -289,10 +292,13 @@ ChunkedSegmentSealedImpl::load_column_group_data_internal(
num_rows,
load_info.mmap_dir_path,
merged_in_load_list);
LOG_INFO("segment {} loads column group {} with num_rows {}",
this->get_segment_id(),
column_group_id.get(),
num_rows);
LOG_INFO(
"segment {} loads column group {} with field ids {} with num_rows "
"{}",
this->get_segment_id(),
column_group_id.get(),
field_id_list.ToString(),
num_rows);
auto field_metas = schema_->get_field_metas(milvus_field_ids);
@ -304,7 +310,7 @@ ChunkedSegmentSealedImpl::load_column_group_data_internal(
insert_files,
info.enable_mmap,
row_group_meta_list,
schema_->size(),
milvus_field_ids.size(),
load_info.load_priority);
auto chunked_column_group =
@ -460,9 +466,6 @@ ChunkedSegmentSealedImpl::AddFieldDataInfoForSealed(
const LoadFieldDataInfo& field_data_info) {
// copy assignment
field_data_info_ = field_data_info;
if (field_data_info_.storage_version == 2) {
init_narrow_column_field_ids(field_data_info);
}
}
// internal API: support scalar index only
@ -2014,23 +2017,4 @@ ChunkedSegmentSealedImpl::fill_empty_field(const FieldMeta& field_meta) {
set_bit(field_data_ready_bitset_, field_id, true);
}
void
ChunkedSegmentSealedImpl::init_narrow_column_field_ids(
const LoadFieldDataInfo& field_data_info) {
std::unordered_set<int64_t> column_group_ids;
for (auto& [id, info] : field_data_info.field_infos) {
int64_t group_id =
storage::ExtractGroupIdFromPath(info.insert_files[0]);
column_group_ids.insert(group_id);
}
std::vector<FieldId> narrow_column_field_ids;
for (auto& field_id : schema_->get_field_ids()) {
if (column_group_ids.find(field_id.get()) == column_group_ids.end()) {
narrow_column_field_ids.push_back(field_id);
}
}
narrow_column_field_ids_ = narrow_column_field_ids;
}
} // namespace milvus::segcore

View File

@ -405,9 +405,6 @@ class ChunkedSegmentSealedImpl : public SegmentSealed {
bool enable_mmap,
bool is_proxy_column);
void
init_narrow_column_field_ids(const LoadFieldDataInfo& field_data_info);
private:
// InsertRecord needs to pin pk column.
friend class storagev1translator::InsertRecordTranslator;
@ -440,9 +437,6 @@ class ChunkedSegmentSealedImpl : public SegmentSealed {
LoadFieldDataInfo field_data_info_;
// for storage v2
std::vector<FieldId> narrow_column_field_ids_;
SchemaPtr schema_;
int64_t id_;
mutable std::unordered_map<FieldId, std::shared_ptr<ChunkedColumnInterface>>

View File

@ -450,6 +450,11 @@ SegmentGrowingImpl::load_column_group_data_internal(
std::vector<int64_t> all_row_groups(row_group_num);
std::iota(all_row_groups.begin(), all_row_groups.end(), 0);
row_group_lists.push_back(all_row_groups);
auto status = reader->Close();
AssertInfo(status.ok(),
"failed to close file reader when get row group "
"metadata from file: " +
file + " with error: " + status.ToString());
}
// create parallel degree split strategy

View File

@ -950,7 +950,8 @@ LoadArrowReaderFromRemote(const std::vector<std::string>& remote_files,
auto rcm = storage::RemoteChunkManagerSingleton::GetInstance()
.GetRemoteChunkManager();
auto codec_futures = storage::GetObjectData(rcm.get(), remote_files, milvus::PriorityForLoad(priority), false);
auto codec_futures = storage::GetObjectData(
rcm.get(), remote_files, milvus::PriorityForLoad(priority), false);
for (auto& codec_future : codec_futures) {
auto reader = codec_future.get()->GetReader();
channel->push(reader);
@ -969,7 +970,8 @@ LoadFieldDatasFromRemote(const std::vector<std::string>& remote_files,
try {
auto rcm = storage::RemoteChunkManagerSingleton::GetInstance()
.GetRemoteChunkManager();
auto codec_futures = storage::GetObjectData(rcm.get(), remote_files, milvus::PriorityForLoad(priority));
auto codec_futures = storage::GetObjectData(
rcm.get(), remote_files, milvus::PriorityForLoad(priority));
for (auto& codec_future : codec_futures) {
auto field_data = codec_future.get()->GetFieldData();
channel->push(field_data);

View File

@ -709,7 +709,9 @@ GetObjectData(ChunkManager* remote_chunk_manager,
std::vector<std::future<std::unique_ptr<DataCodec>>> futures;
futures.reserve(remote_files.size());
auto DownloadAndDeserialize = [&](ChunkManager* chunk_manager, const std::string& file, bool is_field_data) {
auto DownloadAndDeserialize = [&](ChunkManager* chunk_manager,
const std::string& file,
bool is_field_data) {
// TODO remove this Size() cost
auto fileSize = chunk_manager->Size(file);
auto buf = std::shared_ptr<uint8_t[]>(new uint8_t[fileSize]);
@ -725,7 +727,6 @@ GetObjectData(ChunkManager* remote_chunk_manager,
return futures;
}
std::map<std::string, int64_t>
PutIndexData(ChunkManager* remote_chunk_manager,
const std::vector<const uint8_t*>& data_slices,
@ -1098,6 +1099,11 @@ GetFieldDatasFromStorageV2(std::vector<std::vector<std::string>>& remote_files,
auto field_schema =
reader->schema()->field(column_offset.col_index)->Copy();
auto arrow_schema = arrow::schema({field_schema});
auto status = reader->Close();
AssertInfo(
status.ok(),
"failed to close file reader when get arrow schema from file: " +
column_group_file + " with error: " + status.ToString());
// split row groups for parallel reading
auto strategy = std::make_unique<segcore::ParallelDegreeSplitStrategy>(
@ -1180,4 +1186,28 @@ ExtractGroupIdFromPath(const std::string& path) {
path.substr(second_last_slash + 1, last_slash - second_last_slash - 1));
}
// if it is multi-field column group, read field id list from file metadata
// if it is single-field column group, return the column group id
milvus_storage::FieldIDList
GetFieldIDList(FieldId column_group_id,
const std::string& filepath,
const std::shared_ptr<arrow::Schema>& arrow_schema,
milvus_storage::ArrowFileSystemPtr fs) {
milvus_storage::FieldIDList field_id_list;
if (column_group_id >= FieldId(START_USER_FIELDID)) {
field_id_list.Add(column_group_id.get());
return field_id_list;
}
auto file_reader = std::make_shared<milvus_storage::FileRowGroupReader>(
fs, filepath, arrow_schema);
field_id_list =
file_reader->file_metadata()->GetGroupFieldIDList().GetFieldIDList(
column_group_id.get());
auto status = file_reader->Close();
AssertInfo(status.ok(),
"failed to close file reader when get field id list from {}",
filepath);
return field_id_list;
}
} // namespace milvus::storage

View File

@ -35,6 +35,7 @@
#include "storage/Types.h"
#include "milvus-storage/filesystem/fs.h"
#include "storage/ThreadPools.h"
#include "milvus-storage/common/metadata.h"
namespace milvus::storage {
@ -269,4 +270,10 @@ ConvertFieldDataToArrowDataWrapper(const FieldDataPtr& field_data) {
file_data);
}
milvus_storage::FieldIDList
GetFieldIDList(FieldId column_group_id,
const std::string& filepath,
const std::shared_ptr<arrow::Schema>& arrow_schema,
milvus_storage::ArrowFileSystemPtr fs);
} // namespace milvus::storage

View File

@ -106,6 +106,11 @@ TEST_P(GroupChunkTranslatorTest, TestWithMmap) {
row_group_meta_list.push_back(
fr->file_metadata()->GetRowGroupMetadataVector());
auto status = fr->Close();
AssertInfo(
status.ok(),
"failed to close file reader when get row group metadata from file: " +
paths_[0] + " with error: " + status.ToString());
GroupChunkTranslator translator(segment_id_,
field_metas,

View File

@ -200,6 +200,11 @@ TEST_F(TestGrowingStorageV2, LoadWithStrategy) {
auto fr = std::make_shared<milvus_storage::FileRowGroupReader>(
fs_, paths[0], schema_);
auto row_group_metadata = fr->file_metadata()->GetRowGroupMetadataVector();
auto status = fr->Close();
AssertInfo(
status.ok(),
"failed to close file reader when get row group metadata from file: " +
paths[0] + " with error: " + status.ToString());
std::vector<int64_t> row_groups(row_group_metadata.size());
std::iota(row_groups.begin(), row_groups.end(), 0);
std::vector<std::vector<int64_t>> row_group_lists = {row_groups};

View File

@ -430,6 +430,9 @@ func (st *statsTask) createTextIndex(ctx context.Context,
})
getInsertFiles := func(fieldID int64) ([]string, error) {
if st.req.GetStorageVersion() == storage.StorageV2 {
return []string{}, nil
}
binlogs, ok := fieldBinlogs[fieldID]
if !ok {
return nil, fmt.Errorf("field binlog not found for field %d", fieldID)
@ -461,7 +464,9 @@ func (st *statsTask) createTextIndex(ctx context.Context,
return err
}
buildIndexParams := buildIndexParams(st.req, files, field, newStorageConfig, 0)
req := proto.Clone(st.req).(*workerpb.CreateStatsRequest)
req.InsertLogs = insertBinlogs
buildIndexParams := buildIndexParams(req, files, field, newStorageConfig, 0)
uploaded, err := indexcgowrapper.CreateTextIndex(ctx, buildIndexParams)
if err != nil {
@ -521,6 +526,9 @@ func (st *statsTask) createJSONKeyStats(ctx context.Context,
})
getInsertFiles := func(fieldID int64) ([]string, error) {
if st.req.GetStorageVersion() == storage.StorageV2 {
return []string{}, nil
}
binlogs, ok := fieldBinlogs[fieldID]
if !ok {
return nil, fmt.Errorf("field binlog not found for field %d", fieldID)
@ -551,7 +559,9 @@ func (st *statsTask) createJSONKeyStats(ctx context.Context,
return err
}
buildIndexParams := buildIndexParams(st.req, files, field, newStorageConfig, tantivyMemory)
req := proto.Clone(st.req).(*workerpb.CreateStatsRequest)
req.InsertLogs = insertBinlogs
buildIndexParams := buildIndexParams(req, files, field, newStorageConfig, tantivyMemory)
uploaded, err := indexcgowrapper.CreateJSONKeyStats(ctx, buildIndexParams)
if err != nil {
@ -614,7 +624,9 @@ func buildIndexParams(
req.GetStorageConfig(),
req.GetCollectionID(),
req.GetPartitionID(),
req.GetTargetSegmentID())
req.GetTargetSegmentID(),
)
log.Info("build index params", zap.Any("segment insert files", params.SegmentInsertFiles))
}
return params

View File

@ -18,13 +18,10 @@ package syncmgr
import (
"context"
"fmt"
"math"
"github.com/apache/arrow/go/v17/arrow/array"
"github.com/apache/arrow/go/v17/arrow/memory"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
@ -32,7 +29,6 @@ import (
"github.com/milvus-io/milvus/internal/flushcommon/metacache"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/storagecommon"
"github.com/milvus-io/milvus/internal/storagev2/packed"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
@ -106,10 +102,7 @@ func (bw *BulkPackWriterV2) writeInserts(ctx context.Context, pack *SyncPack) (m
if len(pack.insertData) == 0 {
return make(map[int64]*datapb.FieldBinlog), nil
}
columnGroups, err := bw.splitInsertData(pack.insertData, packed.ColumnGroupSizeThreshold)
if err != nil {
return nil, err
}
columnGroups := storagecommon.SplitBySchema(bw.schema.GetFields())
rec, err := bw.serializeBinlog(ctx, pack)
if err != nil {
@ -167,45 +160,6 @@ func (bw *BulkPackWriterV2) writeInserts(ctx context.Context, pack *SyncPack) (m
return logs, nil
}
// split by row average size
func (bw *BulkPackWriterV2) splitInsertData(insertData []*storage.InsertData, splitThresHold int64) ([]storagecommon.ColumnGroup, error) {
groups := make([]storagecommon.ColumnGroup, 0)
shortColumnGroup := storagecommon.ColumnGroup{Columns: make([]int, 0), GroupID: storagecommon.DefaultShortColumnGroupID}
memorySizes := make(map[storage.FieldID]int64, len(insertData[0].Data))
rowNums := make(map[storage.FieldID]int64, len(insertData[0].Data))
for _, data := range insertData {
for fieldID, fieldData := range data.Data {
if _, ok := memorySizes[fieldID]; !ok {
memorySizes[fieldID] = 0
rowNums[fieldID] = 0
}
memorySizes[fieldID] += int64(fieldData.GetMemorySize())
rowNums[fieldID] += int64(fieldData.RowNum())
}
}
uniqueRows := lo.Uniq(lo.Values(rowNums))
if len(uniqueRows) != 1 || uniqueRows[0] == 0 {
return nil, errors.New("row num is not equal for each field")
}
for i, field := range bw.schema.GetFields() {
if _, ok := memorySizes[field.FieldID]; !ok {
return nil, fmt.Errorf("field %d not found in insert data", field.FieldID)
}
// Check if the field is a vector type
if storage.IsVectorDataType(field.DataType) || field.DataType == schemapb.DataType_Text {
groups = append(groups, storagecommon.ColumnGroup{Columns: []int{i}, GroupID: field.GetFieldID()})
} else if rowNums[field.FieldID] != 0 && memorySizes[field.FieldID]/rowNums[field.FieldID] >= splitThresHold {
groups = append(groups, storagecommon.ColumnGroup{Columns: []int{i}, GroupID: field.GetFieldID()})
} else {
shortColumnGroup.Columns = append(shortColumnGroup.Columns, i)
}
}
if len(shortColumnGroup.Columns) > 0 {
groups = append([]storagecommon.ColumnGroup{shortColumnGroup}, groups...)
}
return groups, nil
}
func (bw *BulkPackWriterV2) serializeBinlog(ctx context.Context, pack *SyncPack) (storage.Record, error) {
if len(pack.insertData) == 0 {
return nil, nil

View File

@ -514,19 +514,6 @@ var serdeMap = func() map[schemapb.DataType]serdeEntry {
return m
}()
func IsVectorDataType(dataType schemapb.DataType) bool {
switch dataType {
case schemapb.DataType_BinaryVector,
schemapb.DataType_Float16Vector,
schemapb.DataType_BFloat16Vector,
schemapb.DataType_Int8Vector,
schemapb.DataType_FloatVector,
schemapb.DataType_SparseFloatVector:
return true
}
return false
}
// Since parquet does not support custom fallback encoding for now,
// we disable dict encoding for primary key.
// It can be scale to all fields once parquet fallback encoding is available.
@ -905,6 +892,9 @@ func BuildRecord(b *array.RecordBuilder, data *InsertData, fields []*schemapb.Fi
if !ok {
panic("unknown type")
}
if data.Data[field.FieldID].RowNum() == 0 {
return merr.WrapErrServiceInternal(fmt.Sprintf("row num is 0 for field %s", field.Name))
}
for j := 0; j < data.Data[field.FieldID].RowNum(); j++ {
ok = typeEntry.serialize(fBuilder, data.Data[field.FieldID].GetRow(j))
if !ok {

View File

@ -345,31 +345,10 @@ func (pw *PackedBinlogRecordWriter) Write(r Record) error {
return nil
}
func (pw *PackedBinlogRecordWriter) splitColumnByRecord(r Record) []storagecommon.ColumnGroup {
groups := make([]storagecommon.ColumnGroup, 0)
shortColumnGroup := storagecommon.ColumnGroup{Columns: make([]int, 0), GroupID: storagecommon.DefaultShortColumnGroupID}
for i, field := range pw.schema.Fields {
arr := r.Column(field.FieldID)
size := arr.Data().SizeInBytes()
rows := uint64(arr.Len())
if IsVectorDataType(field.DataType) || field.DataType == schemapb.DataType_Text {
groups = append(groups, storagecommon.ColumnGroup{Columns: []int{i}, GroupID: field.GetFieldID()})
} else if rows != 0 && int64(size/rows) >= packed.ColumnGroupSizeThreshold {
groups = append(groups, storagecommon.ColumnGroup{Columns: []int{i}, GroupID: field.GetFieldID()})
} else {
shortColumnGroup.Columns = append(shortColumnGroup.Columns, i)
}
}
if len(shortColumnGroup.Columns) > 0 {
groups = append([]storagecommon.ColumnGroup{shortColumnGroup}, groups...)
}
return groups
}
func (pw *PackedBinlogRecordWriter) initWriters(r Record) error {
if pw.writer == nil {
if len(pw.columnGroups) == 0 {
pw.columnGroups = pw.splitColumnByRecord(r)
pw.columnGroups = storagecommon.SplitBySchema(pw.schema.Fields)
}
logIdStart, _, err := pw.allocator.Alloc(uint32(len(pw.columnGroups)))
if err != nil {

View File

@ -215,25 +215,3 @@ func TestCalculateArraySize(t *testing.T) {
})
}
}
func TestIsVectorDataType(t *testing.T) {
tests := []struct {
name string
dt schemapb.DataType
want bool
}{
{"test float vector", schemapb.DataType_FloatVector, true},
{"test binary vector", schemapb.DataType_BinaryVector, true},
{"test float16 vector", schemapb.DataType_Float16Vector, true},
{"test bfloat16 vector", schemapb.DataType_BFloat16Vector, true},
{"test int8 vector", schemapb.DataType_Int8Vector, true},
{"test sparse float vector", schemapb.DataType_SparseFloatVector, true},
{"test sparse binary vector", schemapb.DataType_String, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := IsVectorDataType(tt.dt)
assert.Equal(t, tt.want, got)
})
}
}

View File

@ -17,6 +17,7 @@
package storagecommon
import (
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
@ -29,3 +30,33 @@ type ColumnGroup struct {
GroupID typeutil.UniqueID
Columns []int // column indices
}
// SplitBySchema is a generic function to split columns by schema based on data type
func SplitBySchema(fields []*schemapb.FieldSchema) []ColumnGroup {
groups := make([]ColumnGroup, 0)
shortColumnGroup := ColumnGroup{Columns: make([]int, 0), GroupID: DefaultShortColumnGroupID}
for i, field := range fields {
if IsVectorDataType(field.DataType) || field.DataType == schemapb.DataType_Text {
groups = append(groups, ColumnGroup{Columns: []int{i}, GroupID: field.GetFieldID()})
} else {
shortColumnGroup.Columns = append(shortColumnGroup.Columns, i)
}
}
if len(shortColumnGroup.Columns) > 0 {
groups = append([]ColumnGroup{shortColumnGroup}, groups...)
}
return groups
}
func IsVectorDataType(dataType schemapb.DataType) bool {
switch dataType {
case schemapb.DataType_BinaryVector,
schemapb.DataType_Float16Vector,
schemapb.DataType_BFloat16Vector,
schemapb.DataType_Int8Vector,
schemapb.DataType_FloatVector,
schemapb.DataType_SparseFloatVector:
return true
}
return false
}