enhance: [StorageV2] field id as meta path for wide column when load (#42863)

related: #42862 #39173

Signed-off-by: shaoting-huang <shaoting.huang@zilliz.com>
This commit is contained in:
sthuang 2025-06-25 11:08:48 +08:00 committed by GitHub
parent d4260b47fa
commit 0d57acb13a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 131 additions and 60 deletions

View File

@ -23,6 +23,7 @@
#include <string>
#include <string_view>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include "Utils.h"
@ -249,6 +250,8 @@ ChunkedSegmentSealedImpl::load_column_group_data_internal(
size_t num_rows = storage::GetNumRowsForLoadInfo(load_info);
ArrowSchemaPtr arrow_schema = schema_->ConvertToArrowSchema();
auto field_ids = schema_->get_field_ids();
for (auto& [id, info] : load_info.field_infos) {
AssertInfo(info.row_count > 0, "The row count of field data is 0");
@ -257,12 +260,6 @@ ChunkedSegmentSealedImpl::load_column_group_data_internal(
storage::SortByPath(insert_files);
auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance()
.GetArrowFileSystem();
auto file_reader = std::make_shared<milvus_storage::FileRowGroupReader>(
fs, insert_files[0], arrow_schema);
std::shared_ptr<milvus_storage::PackedFileMetadata> metadata =
file_reader->file_metadata();
auto field_id_mapping = metadata->GetFieldIDMapping();
std::vector<milvus_storage::RowGroupMetadataVector> row_group_meta_list;
for (const auto& file : insert_files) {
@ -272,20 +269,20 @@ ChunkedSegmentSealedImpl::load_column_group_data_internal(
reader->file_metadata()->GetRowGroupMetadataVector());
}
milvus_storage::FieldIDList field_id_list =
metadata->GetGroupFieldIDList().GetFieldIDList(
column_group_id.get());
std::vector<FieldId> milvus_field_ids;
if (column_group_id == FieldId(DEFAULT_SHORT_COLUMN_GROUP_ID)) {
milvus_field_ids = narrow_column_field_ids_;
} else {
milvus_field_ids.push_back(column_group_id);
}
// if multiple fields share same column group
// hint for not loading certain field shall not be working for now
// warmup will be disabled only when all columns are not in load list
bool merged_in_load_list = false;
for (int i = 0; i < field_id_list.size(); ++i) {
milvus_field_ids.emplace_back(field_id_list.Get(i));
merged_in_load_list =
merged_in_load_list ||
schema_->ShallLoadField(FieldId(field_id_list.Get(i)));
for (int i = 0; i < milvus_field_ids.size(); ++i) {
merged_in_load_list = merged_in_load_list ||
schema_->ShallLoadField(milvus_field_ids[i]);
}
auto column_group_info = FieldDataInfo(column_group_id.get(),
@ -307,7 +304,7 @@ ChunkedSegmentSealedImpl::load_column_group_data_internal(
insert_files,
info.enable_mmap,
row_group_meta_list,
field_id_list,
schema_->size(),
load_info.load_priority);
auto chunked_column_group =
@ -463,6 +460,9 @@ ChunkedSegmentSealedImpl::AddFieldDataInfoForSealed(
const LoadFieldDataInfo& field_data_info) {
// copy assignment
field_data_info_ = field_data_info;
if (field_data_info_.storage_version == 2) {
init_narrow_column_field_ids(field_data_info);
}
}
// internal API: support scalar index only
@ -2014,4 +2014,23 @@ ChunkedSegmentSealedImpl::fill_empty_field(const FieldMeta& field_meta) {
set_bit(field_data_ready_bitset_, field_id, true);
}
void
ChunkedSegmentSealedImpl::init_narrow_column_field_ids(
const LoadFieldDataInfo& field_data_info) {
std::unordered_set<int64_t> column_group_ids;
for (auto& [id, info] : field_data_info.field_infos) {
int64_t group_id =
storage::ExtractGroupIdFromPath(info.insert_files[0]);
column_group_ids.insert(group_id);
}
std::vector<FieldId> narrow_column_field_ids;
for (auto& field_id : schema_->get_field_ids()) {
if (column_group_ids.find(field_id.get()) == column_group_ids.end()) {
narrow_column_field_ids.push_back(field_id);
}
}
narrow_column_field_ids_ = narrow_column_field_ids;
}
} // namespace milvus::segcore

View File

@ -405,6 +405,9 @@ class ChunkedSegmentSealedImpl : public SegmentSealed {
bool enable_mmap,
bool is_proxy_column);
void
init_narrow_column_field_ids(const LoadFieldDataInfo& field_data_info);
private:
// InsertRecord needs to pin pk column.
friend class storagev1translator::InsertRecordTranslator;
@ -437,6 +440,9 @@ class ChunkedSegmentSealedImpl : public SegmentSealed {
LoadFieldDataInfo field_data_info_;
// for storage v2
std::vector<FieldId> narrow_column_field_ids_;
SchemaPtr schema_;
int64_t id_;
mutable std::unordered_map<FieldId, std::shared_ptr<ChunkedColumnInterface>>

View File

@ -46,6 +46,7 @@
#include "milvus-storage/format/parquet/file_reader.h"
#include "milvus-storage/filesystem/fs.h"
#include "milvus-storage/common/constants.h"
namespace milvus::segcore {
@ -425,16 +426,6 @@ SegmentGrowingImpl::load_column_group_data_internal(
storage::SortByPath(insert_files);
auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance()
.GetArrowFileSystem();
auto file_reader = std::make_shared<milvus_storage::FileRowGroupReader>(
fs, insert_files[0], arrow_schema);
std::shared_ptr<milvus_storage::PackedFileMetadata> metadata =
file_reader->file_metadata();
auto field_id_mapping = metadata->GetFieldIDMapping();
milvus_storage::FieldIDList field_ids =
metadata->GetGroupFieldIDList().GetFieldIDList(
column_group_id.get());
auto column_group_info =
FieldDataInfo(column_group_id.get(), num_rows, infos.mmap_dir_path);
@ -475,9 +466,9 @@ SegmentGrowingImpl::load_column_group_data_internal(
infos.load_priority);
});
LOG_INFO("segment {} submits load fields {} task to thread pool",
LOG_INFO("segment {} submits load column group {} task to thread pool",
this->get_segment_id(),
field_ids.ToString());
info.field_id);
std::shared_ptr<milvus::ArrowDataWrapper> r;
@ -485,10 +476,19 @@ SegmentGrowingImpl::load_column_group_data_internal(
while (column_group_info.arrow_reader_channel->pop(r)) {
for (const auto& table : r->arrow_tables) {
size_t batch_num_rows = table->num_rows();
for (int i = 0; i < field_ids.size(); ++i) {
auto field_id = FieldId(field_ids.Get(i));
for (int i = 0; i < table->schema()->num_fields(); ++i) {
AssertInfo(table->schema()->field(i)->metadata()->Contains(
milvus_storage::ARROW_FIELD_ID_KEY),
"field id not found in metadata for field {}",
table->schema()->field(i)->name());
auto field_id =
std::stoll(table->schema()
->field(i)
->metadata()
->Get(milvus_storage::ARROW_FIELD_ID_KEY)
->data());
for (auto& field : schema_->get_fields()) {
if (field.second.get_id().get() != field_id.get()) {
if (field.second.get_id().get() != field_id) {
continue;
}
auto field_data = storage::CreateFieldData(
@ -498,7 +498,7 @@ SegmentGrowingImpl::load_column_group_data_internal(
: 0,
batch_num_rows);
field_data->FillFieldData(table->column(i));
field_data_map[field_id].push_back(field_data);
field_data_map[FieldId(field_id)].push_back(field_data);
}
}
}

View File

@ -20,6 +20,7 @@
#include "common/Types.h"
#include "milvus-storage/common/metadata.h"
#include "milvus-storage/filesystem/fs.h"
#include "milvus-storage/common/constants.h"
#include "storage/ThreadPools.h"
#include "segcore/memory_planner.h"
@ -43,7 +44,7 @@ GroupChunkTranslator::GroupChunkTranslator(
bool use_mmap,
const std::vector<milvus_storage::RowGroupMetadataVector>&
row_group_meta_list,
milvus_storage::FieldIDList field_id_list,
int64_t num_fields,
milvus::proto::common::LoadPriority load_priority)
: segment_id_(segment_id),
key_(fmt::format("seg_{}_cg_{}", segment_id, column_group_info.field_id)),
@ -52,10 +53,9 @@ GroupChunkTranslator::GroupChunkTranslator(
insert_files_(insert_files),
use_mmap_(use_mmap),
row_group_meta_list_(row_group_meta_list),
field_id_list_(field_id_list),
load_priority_(load_priority),
meta_(
field_id_list.size(),
num_fields,
use_mmap ? milvus::cachinglayer::StorageType::DISK
: milvus::cachinglayer::StorageType::MEMORY,
// TODO(tiered storage 2): vector may be of small size and mixed with scalar, do we force it
@ -163,9 +163,9 @@ GroupChunkTranslator::get_cells(const std::vector<cachinglayer::cid_t>& cids) {
nullptr,
load_priority_);
});
LOG_INFO("segment {} submits load fields {} task to thread pool",
LOG_INFO("segment {} submits load column group {} task to thread pool",
segment_id_,
field_id_list_.ToString());
column_group_info_.field_id);
std::shared_ptr<milvus::ArrowDataWrapper> r;
int64_t cid_idx = 0;
@ -194,8 +194,17 @@ GroupChunkTranslator::load_group_chunk(
// Create chunks for each field in this batch
std::unordered_map<FieldId, std::shared_ptr<Chunk>> chunks;
// Iterate through field_id_list to get field_id and create chunk
for (size_t i = 0; i < field_id_list_.size(); ++i) {
auto field_id = field_id_list_.Get(i);
for (int i = 0; i < table->schema()->num_fields(); ++i) {
AssertInfo(table->schema()->field(i)->metadata()->Contains(
milvus_storage::ARROW_FIELD_ID_KEY),
"field id not found in metadata for field {}",
table->schema()->field(i)->name());
auto field_id = std::stoll(table->schema()
->field(i)
->metadata()
->Get(milvus_storage::ARROW_FIELD_ID_KEY)
->data());
auto fid = milvus::FieldId(field_id);
if (fid == RowFieldID) {
// ignore row id field

View File

@ -43,7 +43,7 @@ class GroupChunkTranslator
bool use_mmap,
const std::vector<milvus_storage::RowGroupMetadataVector>&
row_group_meta_list,
milvus_storage::FieldIDList field_id_list,
int64_t num_fields,
milvus::proto::common::LoadPriority load_priority);
~GroupChunkTranslator() override;
@ -83,7 +83,6 @@ class GroupChunkTranslator
FieldDataInfo column_group_info_;
std::vector<std::string> insert_files_;
std::vector<milvus_storage::RowGroupMetadataVector> row_group_meta_list_;
milvus_storage::FieldIDList field_id_list_;
SchemaPtr schema_;
bool is_sorted_by_pk_;
ChunkedSegmentSealedImpl* chunked_segment_;

View File

@ -1060,14 +1060,7 @@ GetFieldDatasFromStorageV2(std::vector<std::vector<std::string>>& remote_files,
std::unordered_map<int64_t, std::vector<std::string>> column_group_files;
for (auto& remote_chunk_files : remote_files) {
AssertInfo(remote_chunk_files.size() > 0, "remote files size is 0");
// find second last of / to get group_id
std::string path = remote_chunk_files[0];
size_t last_slash = path.find_last_of("/");
size_t second_last_slash = path.find_last_of("/", last_slash - 1);
int64_t group_id = std::stol(path.substr(
second_last_slash + 1, last_slash - second_last_slash - 1));
int64_t group_id = ExtractGroupIdFromPath(remote_chunk_files[0]);
column_group_files[group_id] = remote_chunk_files;
}
@ -1184,4 +1177,13 @@ CacheRawDataAndFillMissing(const MemFileManagerImplPtr& file_manager,
return field_datas;
}
int64_t
ExtractGroupIdFromPath(const std::string& path) {
// find second last of / to get group_id
size_t last_slash = path.find_last_of("/");
size_t second_last_slash = path.find_last_of("/", last_slash - 1);
return std::stol(
path.substr(second_last_slash + 1, last_slash - second_last_slash - 1));
}
} // namespace milvus::storage

View File

@ -202,6 +202,9 @@ CollectFieldDataChannel(FieldDataChannelPtr& channel);
FieldDataPtr
MergeFieldData(std::vector<FieldDataPtr>& data_array);
int64_t
ExtractGroupIdFromPath(const std::string& path);
template <typename T, typename = void>
struct has_native_type : std::false_type {};
template <typename T>

View File

@ -74,15 +74,26 @@ class TestChunkSegmentStorageV2 : public testing::TestWithParam<bool> {
// Initialize file system
auto conf = milvus_storage::ArrowFileSystemConfig();
conf.storage_type = "local";
conf.root_path = "/tmp";
conf.root_path = "test_data";
milvus_storage::ArrowFileSystemSingleton::GetInstance().Init(conf);
auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance()
.GetArrowFileSystem();
// Prepare paths and column groups
std::vector<std::string> paths = {"/tmp/0.parquet", "/tmp/1.parquet"};
std::vector<std::string> paths = {"test_data/0/10000.parquet",
"test_data/102/10001.parquet",
"test_data/103/10002.parquet"};
// Create directories for the parquet files
for (const auto& path : paths) {
auto dir_path = path.substr(0, path.find_last_of('/'));
auto status = fs->CreateDir(dir_path);
EXPECT_TRUE(status.ok())
<< "Failed to create directory: " << dir_path;
}
std::vector<std::vector<int>> column_groups = {
{0, 1, 2}, {3, 4}}; // short columns and long columns
{0, 4, 3}, {2}, {1}}; // narrow columns and wide columns
auto writer_memory = 16 * 1024 * 1024;
auto storage_config = milvus_storage::StorageConfig();
@ -162,15 +173,38 @@ class TestChunkSegmentStorageV2 : public testing::TestWithParam<bool> {
false,
std::vector<std::string>({paths[0]})});
load_info.field_infos.emplace(
int64_t(1),
FieldBinlogInfo{int64_t(1),
int64_t(102),
FieldBinlogInfo{int64_t(102),
static_cast<int64_t>(row_count),
std::vector<int64_t>(chunk_num * test_data_count),
false,
std::vector<std::string>({paths[1]})});
load_info.field_infos.emplace(
int64_t(103),
FieldBinlogInfo{int64_t(103),
static_cast<int64_t>(row_count),
std::vector<int64_t>(chunk_num * test_data_count),
false,
std::vector<std::string>({paths[2]})});
load_info.mmap_dir_path = "";
load_info.storage_version = 2;
segment->LoadFieldData(load_info);
segment->AddFieldDataInfoForSealed(load_info);
for (auto& [id, info] : load_info.field_infos) {
LoadFieldDataInfo load_field_info;
load_field_info.storage_version = 2;
load_field_info.mmap_dir_path = "";
load_field_info.field_infos.emplace(id, info);
segment->LoadFieldData(load_field_info);
}
}
void
TearDown() override {
// Clean up test data directory
auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance()
.GetArrowFileSystem();
auto status = fs->DeleteDir("test_data");
ASSERT_TRUE(status.ok());
}
segcore::SegmentSealedUPtr segment;

View File

@ -103,8 +103,6 @@ TEST_P(GroupChunkTranslatorTest, TestWithMmap) {
std::vector<milvus_storage::RowGroupMetadataVector> row_group_meta_list;
auto fr =
std::make_shared<milvus_storage::FileRowGroupReader>(fs_, paths_[0]);
auto field_id_list =
fr->file_metadata()->GetGroupFieldIDList().GetFieldIDList(0);
row_group_meta_list.push_back(
fr->file_metadata()->GetRowGroupMetadataVector());
@ -115,7 +113,7 @@ TEST_P(GroupChunkTranslatorTest, TestWithMmap) {
paths_,
use_mmap,
row_group_meta_list,
field_id_list,
schema_->get_field_ids().size(),
milvus::proto::common::LoadPriority::LOW);
// num cells

View File

@ -843,9 +843,10 @@ func (s *LocalSegment) AddFieldDataInfo(ctx context.Context, rowCount int64, fie
)
req := &segcore.AddFieldDataInfoRequest{
Fields: make([]segcore.LoadFieldDataInfo, 0, len(fields)),
RowCount: rowCount,
LoadPriority: s.loadInfo.Load().GetPriority(),
Fields: make([]segcore.LoadFieldDataInfo, 0, len(fields)),
RowCount: rowCount,
LoadPriority: s.loadInfo.Load().GetPriority(),
StorageVersion: s.loadInfo.Load().GetStorageVersion(),
}
for _, field := range fields {
req.Fields = append(req.Fields, segcore.LoadFieldDataInfo{