diff --git a/internal/core/src/storage/Util.cpp b/internal/core/src/storage/Util.cpp index 13846f2ce7..a5d50590b9 100644 --- a/internal/core/src/storage/Util.cpp +++ b/internal/core/src/storage/Util.cpp @@ -1081,10 +1081,34 @@ GetFieldDatasFromStorageV2(std::vector>& remote_files, } std::vector remote_chunk_files; + int64_t column_group_id; if (column_group_files.find(field_id) == column_group_files.end()) { + column_group_id = DEFAULT_SHORT_COLUMN_GROUP_ID; remote_chunk_files = column_group_files[DEFAULT_SHORT_COLUMN_GROUP_ID]; } else { remote_chunk_files = column_group_files[field_id]; + column_group_id = field_id; + } + + AssertInfo(remote_chunk_files.size() > 0, "remote files size is 0"); + + // find column offset + milvus_storage::FieldIDList field_id_list = storage::GetFieldIDList( + FieldId(column_group_id), remote_chunk_files[0], nullptr, fs); + size_t col_offset = -1; + for (size_t i = 0; i < field_id_list.size(); ++i) { + if (field_id_list.Get(i) == field_id) { + col_offset = i; + break; + } + } + // field not found, must be newly added field, return empty resultset + if (col_offset == -1) { + LOG_INFO( + "field {} not found in column group {}, return empty result set", + field_id, + column_group_id); + return field_data_list; } AssertInfo(fs != nullptr, @@ -1103,12 +1127,6 @@ GetFieldDatasFromStorageV2(std::vector>& remote_files, std::vector> row_group_lists; auto reader = std::make_shared( fs, column_group_file); - auto field_id_mapping = reader->file_metadata()->GetFieldIDMapping(); - auto it = field_id_mapping.find(field_id); - AssertInfo(it != field_id_mapping.end(), - "field id {} not found in field id mapping", - field_id); - auto column_offset = it->second; auto row_group_num = reader->file_metadata()->GetRowGroupMetadataVector().size(); @@ -1117,8 +1135,7 @@ GetFieldDatasFromStorageV2(std::vector>& remote_files, row_group_lists.push_back(all_row_groups); // create a schema with only the field id - auto field_schema = - reader->schema()->field(column_offset.col_index)->Copy(); + auto field_schema = reader->schema()->field(col_offset)->Copy(); auto arrow_schema = arrow::schema({field_schema}); auto status = reader->Close(); AssertInfo( @@ -1144,8 +1161,7 @@ GetFieldDatasFromStorageV2(std::vector>& remote_files, std::vector> chunked_arrays; for (const auto& table : r->arrow_tables) { num_rows += table->num_rows(); - chunked_arrays.push_back( - table->column(column_offset.col_index)); + chunked_arrays.push_back(table->column(col_offset)); } auto field_data = storage::CreateFieldData( data_type, field_schema->nullable(), dim, num_rows); diff --git a/internal/core/src/storage/storage_c.cpp b/internal/core/src/storage/storage_c.cpp index 57b5e97c40..13421cebf8 100644 --- a/internal/core/src/storage/storage_c.cpp +++ b/internal/core/src/storage/storage_c.cpp @@ -118,20 +118,27 @@ InitMmapManager(CMmapConfig c_mmap_config) { } CStatus -InitFileWriterConfig(const char* mode, uint64_t buffer_size_kb, int nr_threads) { +InitFileWriterConfig(const char* mode, + uint64_t buffer_size_kb, + int nr_threads) { try { std::string mode_str(mode); if (mode_str == "direct") { - milvus::storage::FileWriter::SetMode(milvus::storage::FileWriter::WriteMode::DIRECT); + milvus::storage::FileWriter::SetMode( + milvus::storage::FileWriter::WriteMode::DIRECT); // buffer size checking is done in FileWriter::SetBufferSize, // and it will try to find a proper and valid buffer size - milvus::storage::FileWriter::SetBufferSize(buffer_size_kb * 1024); // convert to bytes + milvus::storage::FileWriter::SetBufferSize( + buffer_size_kb * 1024); // convert to bytes } else if (mode_str == "buffered") { - milvus::storage::FileWriter::SetMode(milvus::storage::FileWriter::WriteMode::BUFFERED); + milvus::storage::FileWriter::SetMode( + milvus::storage::FileWriter::WriteMode::BUFFERED); } else { - return milvus::FailureCStatus(milvus::ConfigInvalid, "Invalid mode"); + return milvus::FailureCStatus(milvus::ConfigInvalid, + "Invalid mode"); } - milvus::storage::FileWriteWorkerPool::GetInstance().Configure(nr_threads); + milvus::storage::FileWriteWorkerPool::GetInstance().Configure( + nr_threads); return milvus::SuccessCStatus(); } catch (std::exception& e) { return milvus::FailureCStatus(&e);