fix: [StorageV2] Handle missing column creating index (#43292)

Related to #43250

Use FieldIDList to check missing field. If column is missing, return
empty resultset

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2025-07-14 17:06:50 +08:00 committed by GitHub
parent 19b2fc7132
commit ae48f0e484
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 39 additions and 16 deletions

View File

@ -1081,10 +1081,34 @@ GetFieldDatasFromStorageV2(std::vector<std::vector<std::string>>& remote_files,
}
std::vector<std::string> remote_chunk_files;
int64_t column_group_id;
if (column_group_files.find(field_id) == column_group_files.end()) {
column_group_id = DEFAULT_SHORT_COLUMN_GROUP_ID;
remote_chunk_files = column_group_files[DEFAULT_SHORT_COLUMN_GROUP_ID];
} else {
remote_chunk_files = column_group_files[field_id];
column_group_id = field_id;
}
AssertInfo(remote_chunk_files.size() > 0, "remote files size is 0");
// find column offset
milvus_storage::FieldIDList field_id_list = storage::GetFieldIDList(
FieldId(column_group_id), remote_chunk_files[0], nullptr, fs);
size_t col_offset = -1;
for (size_t i = 0; i < field_id_list.size(); ++i) {
if (field_id_list.Get(i) == field_id) {
col_offset = i;
break;
}
}
// field not found, must be newly added field, return empty resultset
if (col_offset == -1) {
LOG_INFO(
"field {} not found in column group {}, return empty result set",
field_id,
column_group_id);
return field_data_list;
}
AssertInfo(fs != nullptr,
@ -1103,12 +1127,6 @@ GetFieldDatasFromStorageV2(std::vector<std::vector<std::string>>& remote_files,
std::vector<std::vector<int64_t>> row_group_lists;
auto reader = std::make_shared<milvus_storage::FileRowGroupReader>(
fs, column_group_file);
auto field_id_mapping = reader->file_metadata()->GetFieldIDMapping();
auto it = field_id_mapping.find(field_id);
AssertInfo(it != field_id_mapping.end(),
"field id {} not found in field id mapping",
field_id);
auto column_offset = it->second;
auto row_group_num =
reader->file_metadata()->GetRowGroupMetadataVector().size();
@ -1117,8 +1135,7 @@ GetFieldDatasFromStorageV2(std::vector<std::vector<std::string>>& remote_files,
row_group_lists.push_back(all_row_groups);
// create a schema with only the field id
auto field_schema =
reader->schema()->field(column_offset.col_index)->Copy();
auto field_schema = reader->schema()->field(col_offset)->Copy();
auto arrow_schema = arrow::schema({field_schema});
auto status = reader->Close();
AssertInfo(
@ -1144,8 +1161,7 @@ GetFieldDatasFromStorageV2(std::vector<std::vector<std::string>>& remote_files,
std::vector<std::shared_ptr<arrow::ChunkedArray>> chunked_arrays;
for (const auto& table : r->arrow_tables) {
num_rows += table->num_rows();
chunked_arrays.push_back(
table->column(column_offset.col_index));
chunked_arrays.push_back(table->column(col_offset));
}
auto field_data = storage::CreateFieldData(
data_type, field_schema->nullable(), dim, num_rows);

View File

@ -118,20 +118,27 @@ InitMmapManager(CMmapConfig c_mmap_config) {
}
CStatus
InitFileWriterConfig(const char* mode, uint64_t buffer_size_kb, int nr_threads) {
InitFileWriterConfig(const char* mode,
uint64_t buffer_size_kb,
int nr_threads) {
try {
std::string mode_str(mode);
if (mode_str == "direct") {
milvus::storage::FileWriter::SetMode(milvus::storage::FileWriter::WriteMode::DIRECT);
milvus::storage::FileWriter::SetMode(
milvus::storage::FileWriter::WriteMode::DIRECT);
// buffer size checking is done in FileWriter::SetBufferSize,
// and it will try to find a proper and valid buffer size
milvus::storage::FileWriter::SetBufferSize(buffer_size_kb * 1024); // convert to bytes
milvus::storage::FileWriter::SetBufferSize(
buffer_size_kb * 1024); // convert to bytes
} else if (mode_str == "buffered") {
milvus::storage::FileWriter::SetMode(milvus::storage::FileWriter::WriteMode::BUFFERED);
milvus::storage::FileWriter::SetMode(
milvus::storage::FileWriter::WriteMode::BUFFERED);
} else {
return milvus::FailureCStatus(milvus::ConfigInvalid, "Invalid mode");
return milvus::FailureCStatus(milvus::ConfigInvalid,
"Invalid mode");
}
milvus::storage::FileWriteWorkerPool::GetInstance().Configure(nr_threads);
milvus::storage::FileWriteWorkerPool::GetInstance().Configure(
nr_threads);
return milvus::SuccessCStatus();
} catch (std::exception& e) {
return milvus::FailureCStatus(&e);