fix: use tbb::concurrent_unordered_map for ChunkedSegmentSealedImpl::fields_ (#44084)

issue: https://github.com/milvus-io/milvus/issues/44078

Signed-off-by: Buqian Zheng <zhengbuqian@gmail.com>
This commit is contained in:
Buqian Zheng 2025-08-29 10:01:51 +08:00 committed by GitHub
parent 844caf5cfe
commit 6b22661c06
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 97 additions and 66 deletions

View File

@ -47,8 +47,6 @@
#include "google/protobuf/message_lite.h" #include "google/protobuf/message_lite.h"
#include "index/Index.h" #include "index/Index.h"
#include "index/IndexFactory.h" #include "index/IndexFactory.h"
#include "index/JsonFlatIndex.h"
#include "index/VectorMemIndex.h"
#include "milvus-storage/common/metadata.h" #include "milvus-storage/common/metadata.h"
#include "mmap/ChunkedColumn.h" #include "mmap/ChunkedColumn.h"
#include "mmap/Types.h" #include "mmap/Types.h"
@ -65,7 +63,6 @@
#include "storage/Util.h" #include "storage/Util.h"
#include "storage/ThreadPools.h" #include "storage/ThreadPools.h"
#include "storage/MmapManager.h" #include "storage/MmapManager.h"
#include "milvus-storage/format/parquet/file_reader.h"
#include "milvus-storage/filesystem/fs.h" #include "milvus-storage/filesystem/fs.h"
#include "cachinglayer/CacheSlot.h" #include "cachinglayer/CacheSlot.h"
#include "storage/LocalChunkManagerSingleton.h" #include "storage/LocalChunkManagerSingleton.h"
@ -130,7 +127,7 @@ ChunkedSegmentSealedImpl::LoadVecIndex(const LoadIndexInfo& info) {
info.enable_mmap); info.enable_mmap);
if (request.has_raw_data && get_bit(field_data_ready_bitset_, field_id)) { if (request.has_raw_data && get_bit(field_data_ready_bitset_, field_id)) {
fields_.erase(field_id); fields_.wlock()->erase(field_id);
set_bit(field_data_ready_bitset_, field_id, false); set_bit(field_data_ready_bitset_, field_id, false);
} }
if (get_bit(binlog_index_bitset_, field_id)) { if (get_bit(binlog_index_bitset_, field_id)) {
@ -217,7 +214,7 @@ ChunkedSegmentSealedImpl::LoadScalarIndex(const LoadIndexInfo& info) {
!is_pk) { !is_pk) {
// We do not erase the primary key field: if insert record is evicted from memory, when reloading it'll // We do not erase the primary key field: if insert record is evicted from memory, when reloading it'll
// need the pk field again. // need the pk field again.
fields_.erase(field_id); fields_.wlock()->erase(field_id);
set_bit(field_data_ready_bitset_, field_id, false); set_bit(field_data_ready_bitset_, field_id, false);
} }
} }
@ -225,12 +222,11 @@ ChunkedSegmentSealedImpl::LoadScalarIndex(const LoadIndexInfo& info) {
void void
ChunkedSegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& load_info) { ChunkedSegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& load_info) {
switch (load_info.storage_version) { switch (load_info.storage_version) {
case 2: case 2: {
load_column_group_data_internal(load_info); load_column_group_data_internal(load_info);
auto timestamp_proxy_column = get_column(TimestampFieldID);
// TODO check timestamp_index ready instead of check system_ready_count_ // TODO check timestamp_index ready instead of check system_ready_count_
if (fields_.find(TimestampFieldID) != fields_.end() && if (timestamp_proxy_column && system_ready_count_ == 0) {
system_ready_count_ == 0) {
auto timestamp_proxy_column = fields_.at(TimestampFieldID);
int64_t num_rows; int64_t num_rows;
for (auto& [_, info] : load_info.field_infos) { for (auto& [_, info] : load_info.field_infos) {
num_rows = info.row_count; num_rows = info.row_count;
@ -257,6 +253,7 @@ ChunkedSegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& load_info) {
num_rows); num_rows);
} }
break; break;
}
default: default:
load_field_data_internal(load_info); load_field_data_internal(load_info);
break; break;
@ -513,20 +510,20 @@ ChunkedSegmentSealedImpl::num_chunk_index(FieldId field_id) const {
int64_t int64_t
ChunkedSegmentSealedImpl::num_chunk_data(FieldId field_id) const { ChunkedSegmentSealedImpl::num_chunk_data(FieldId field_id) const {
return get_bit(field_data_ready_bitset_, field_id) if (!get_bit(field_data_ready_bitset_, field_id)) {
? fields_.find(field_id) != fields_.end() return 0;
? fields_.at(field_id)->num_chunks() }
: 1 auto column = get_column(field_id);
: 0; return column ? column->num_chunks() : 1;
} }
int64_t int64_t
ChunkedSegmentSealedImpl::num_chunk(FieldId field_id) const { ChunkedSegmentSealedImpl::num_chunk(FieldId field_id) const {
return get_bit(field_data_ready_bitset_, field_id) if (!get_bit(field_data_ready_bitset_, field_id)) {
? fields_.find(field_id) != fields_.end() return 1;
? fields_.at(field_id)->num_chunks() }
: 1 auto column = get_column(field_id);
: 1; return column ? column->num_chunks() : 1;
} }
int64_t int64_t
@ -536,23 +533,31 @@ ChunkedSegmentSealedImpl::size_per_chunk() const {
int64_t int64_t
ChunkedSegmentSealedImpl::chunk_size(FieldId field_id, int64_t chunk_id) const { ChunkedSegmentSealedImpl::chunk_size(FieldId field_id, int64_t chunk_id) const {
return get_bit(field_data_ready_bitset_, field_id) if (!get_bit(field_data_ready_bitset_, field_id)) {
? fields_.find(field_id) != fields_.end() return 0;
? fields_.at(field_id)->chunk_row_nums(chunk_id) }
: num_rows_.value() auto column = get_column(field_id);
: 0; return column ? column->chunk_row_nums(chunk_id) : num_rows_.value();
} }
std::pair<int64_t, int64_t> std::pair<int64_t, int64_t>
ChunkedSegmentSealedImpl::get_chunk_by_offset(FieldId field_id, ChunkedSegmentSealedImpl::get_chunk_by_offset(FieldId field_id,
int64_t offset) const { int64_t offset) const {
return fields_.at(field_id)->GetChunkIDByOffset(offset); auto column = get_column(field_id);
AssertInfo(column != nullptr,
"field {} must exist when getting chunk by offset",
field_id.get());
return column->GetChunkIDByOffset(offset);
} }
int64_t int64_t
ChunkedSegmentSealedImpl::num_rows_until_chunk(FieldId field_id, ChunkedSegmentSealedImpl::num_rows_until_chunk(FieldId field_id,
int64_t chunk_id) const { int64_t chunk_id) const {
return fields_.at(field_id)->GetNumRowsUntilChunk(chunk_id); auto column = get_column(field_id);
AssertInfo(column != nullptr,
"field {} must exist when getting rows until chunk",
field_id.get());
return column->GetNumRowsUntilChunk(chunk_id);
} }
bool bool
@ -567,8 +572,8 @@ ChunkedSegmentSealedImpl::chunk_data_impl(FieldId field_id,
std::shared_lock lck(mutex_); std::shared_lock lck(mutex_);
AssertInfo(get_bit(field_data_ready_bitset_, field_id), AssertInfo(get_bit(field_data_ready_bitset_, field_id),
"Can't get bitset element at " + std::to_string(field_id.get())); "Can't get bitset element at " + std::to_string(field_id.get()));
if (auto it = fields_.find(field_id); it != fields_.end()) { if (auto column = get_column(field_id)) {
return it->second->Span(chunk_id); return column->Span(chunk_id);
} }
ThrowInfo(ErrorCode::UnexpectedError, ThrowInfo(ErrorCode::UnexpectedError,
"chunk_data_impl only used for chunk column field "); "chunk_data_impl only used for chunk column field ");
@ -583,8 +588,8 @@ ChunkedSegmentSealedImpl::chunk_array_view_impl(
std::shared_lock lck(mutex_); std::shared_lock lck(mutex_);
AssertInfo(get_bit(field_data_ready_bitset_, field_id), AssertInfo(get_bit(field_data_ready_bitset_, field_id),
"Can't get bitset element at " + std::to_string(field_id.get())); "Can't get bitset element at " + std::to_string(field_id.get()));
if (auto it = fields_.find(field_id); it != fields_.end()) { if (auto column = get_column(field_id)) {
return it->second->ArrayViews(chunk_id, offset_len); return column->ArrayViews(chunk_id, offset_len);
} }
ThrowInfo(ErrorCode::UnexpectedError, ThrowInfo(ErrorCode::UnexpectedError,
"chunk_array_view_impl only used for chunk column field "); "chunk_array_view_impl only used for chunk column field ");
@ -599,8 +604,8 @@ ChunkedSegmentSealedImpl::chunk_vector_array_view_impl(
std::shared_lock lck(mutex_); std::shared_lock lck(mutex_);
AssertInfo(get_bit(field_data_ready_bitset_, field_id), AssertInfo(get_bit(field_data_ready_bitset_, field_id),
"Can't get bitset element at " + std::to_string(field_id.get())); "Can't get bitset element at " + std::to_string(field_id.get()));
if (auto it = fields_.find(field_id); it != fields_.end()) { if (auto column = get_column(field_id)) {
return it->second->VectorArrayViews(chunk_id, offset_len); return column->VectorArrayViews(chunk_id, offset_len);
} }
ThrowInfo(ErrorCode::UnexpectedError, ThrowInfo(ErrorCode::UnexpectedError,
"chunk_vector_array_view_impl only used for chunk column field "); "chunk_vector_array_view_impl only used for chunk column field ");
@ -615,8 +620,7 @@ ChunkedSegmentSealedImpl::chunk_string_view_impl(
std::shared_lock lck(mutex_); std::shared_lock lck(mutex_);
AssertInfo(get_bit(field_data_ready_bitset_, field_id), AssertInfo(get_bit(field_data_ready_bitset_, field_id),
"Can't get bitset element at " + std::to_string(field_id.get())); "Can't get bitset element at " + std::to_string(field_id.get()));
if (auto it = fields_.find(field_id); it != fields_.end()) { if (auto column = get_column(field_id)) {
auto column = it->second;
return column->StringViews(chunk_id, offset_len); return column->StringViews(chunk_id, offset_len);
} }
ThrowInfo(ErrorCode::UnexpectedError, ThrowInfo(ErrorCode::UnexpectedError,
@ -631,8 +635,8 @@ ChunkedSegmentSealedImpl::chunk_string_views_by_offsets(
std::shared_lock lck(mutex_); std::shared_lock lck(mutex_);
AssertInfo(get_bit(field_data_ready_bitset_, field_id), AssertInfo(get_bit(field_data_ready_bitset_, field_id),
"Can't get bitset element at " + std::to_string(field_id.get())); "Can't get bitset element at " + std::to_string(field_id.get()));
if (auto it = fields_.find(field_id); it != fields_.end()) { if (auto column = get_column(field_id)) {
return it->second->StringViewsByOffsets(chunk_id, offsets); return column->StringViewsByOffsets(chunk_id, offsets);
} }
ThrowInfo(ErrorCode::UnexpectedError, ThrowInfo(ErrorCode::UnexpectedError,
"chunk_view_by_offsets only used for variable column field "); "chunk_view_by_offsets only used for variable column field ");
@ -646,9 +650,8 @@ ChunkedSegmentSealedImpl::chunk_array_views_by_offsets(
std::shared_lock lck(mutex_); std::shared_lock lck(mutex_);
AssertInfo(get_bit(field_data_ready_bitset_, field_id), AssertInfo(get_bit(field_data_ready_bitset_, field_id),
"Can't get bitset element at " + std::to_string(field_id.get())); "Can't get bitset element at " + std::to_string(field_id.get()));
if (auto it = fields_.find(field_id); it != fields_.end()) { if (auto column = get_column(field_id)) {
auto& field_data = it->second; return column->ArrayViewsByOffsets(chunk_id, offsets);
return field_data->ArrayViewsByOffsets(chunk_id, offsets);
} }
ThrowInfo( ThrowInfo(
ErrorCode::UnexpectedError, ErrorCode::UnexpectedError,
@ -789,7 +792,9 @@ ChunkedSegmentSealedImpl::vector_search(SearchInfo& search_info,
"Field Data is not loaded: " + std::to_string(field_id.get())); "Field Data is not loaded: " + std::to_string(field_id.get()));
AssertInfo(num_rows_.has_value(), "Can't get row count value"); AssertInfo(num_rows_.has_value(), "Can't get row count value");
auto row_count = num_rows_.value(); auto row_count = num_rows_.value();
auto vec_data = fields_.at(field_id); auto vec_data = get_column(field_id);
AssertInfo(
vec_data != nullptr, "vector field {} not loaded", field_id.get());
// get index params for bm25 brute force // get index params for bm25 brute force
std::map<std::string, std::string> index_info; std::map<std::string, std::string> index_info;
@ -862,7 +867,7 @@ ChunkedSegmentSealedImpl::DropFieldData(const FieldId field_id) {
field_id.get()); field_id.get());
std::unique_lock<std::shared_mutex> lck(mutex_); std::unique_lock<std::shared_mutex> lck(mutex_);
if (get_bit(field_data_ready_bitset_, field_id)) { if (get_bit(field_data_ready_bitset_, field_id)) {
fields_.erase(field_id); fields_.wlock()->erase(field_id);
set_bit(field_data_ready_bitset_, field_id, false); set_bit(field_data_ready_bitset_, field_id, false);
} }
if (get_bit(binlog_index_bitset_, field_id)) { if (get_bit(binlog_index_bitset_, field_id)) {
@ -958,7 +963,8 @@ ChunkedSegmentSealedImpl::search_batch_pks(
auto pk_field_id = schema_->get_primary_field_id().value_or(FieldId(-1)); auto pk_field_id = schema_->get_primary_field_id().value_or(FieldId(-1));
AssertInfo(pk_field_id.get() != -1, "Primary key is -1"); AssertInfo(pk_field_id.get() != -1, "Primary key is -1");
auto pk_column = fields_.at(pk_field_id); auto pk_column = get_column(pk_field_id);
AssertInfo(pk_column != nullptr, "primary key column not loaded");
auto all_chunk_pins = pk_column->GetAllChunks(); auto all_chunk_pins = pk_column->GetAllChunks();
@ -1044,7 +1050,8 @@ ChunkedSegmentSealedImpl::search_sorted_pk(const PkType& pk,
Condition condition) const { Condition condition) const {
auto pk_field_id = schema_->get_primary_field_id().value_or(FieldId(-1)); auto pk_field_id = schema_->get_primary_field_id().value_or(FieldId(-1));
AssertInfo(pk_field_id.get() != -1, "Primary key is -1"); AssertInfo(pk_field_id.get() != -1, "Primary key is -1");
auto pk_column = fields_.at(pk_field_id); auto pk_column = get_column(pk_field_id);
AssertInfo(pk_column != nullptr, "primary key column not loaded");
std::vector<SegOffset> pk_offsets; std::vector<SegOffset> pk_offsets;
switch (schema_->get_fields().at(pk_field_id).get_data_type()) { switch (schema_->get_fields().at(pk_field_id).get_data_type()) {
case DataType::INT64: { case DataType::INT64: {
@ -1127,7 +1134,8 @@ ChunkedSegmentSealedImpl::search_sorted_pk_range(proto::plan::OpType op,
Condition condition) const { Condition condition) const {
auto pk_field_id = schema_->get_primary_field_id().value_or(FieldId(-1)); auto pk_field_id = schema_->get_primary_field_id().value_or(FieldId(-1));
AssertInfo(pk_field_id.get() != -1, "Primary key is -1"); AssertInfo(pk_field_id.get() != -1, "Primary key is -1");
auto pk_column = fields_.at(pk_field_id); auto pk_column = get_column(pk_field_id);
AssertInfo(pk_column != nullptr, "primary key column not loaded");
switch (schema_->get_fields().at(pk_field_id).get_data_type()) { switch (schema_->get_fields().at(pk_field_id).get_data_type()) {
case DataType::INT64: { case DataType::INT64: {
@ -1515,7 +1523,7 @@ ChunkedSegmentSealedImpl::ClearData() {
vector_indexings_.clear(); vector_indexings_.clear();
ngram_indexings_.clear(); ngram_indexings_.clear();
insert_record_.clear(); insert_record_.clear();
fields_.clear(); fields_.wlock()->clear();
variable_fields_avg_size_.clear(); variable_fields_avg_size_.clear();
stats_.mem_size = 0; stats_.mem_size = 0;
} }
@ -1559,9 +1567,9 @@ ChunkedSegmentSealedImpl::CreateTextIndex(FieldId field_id) {
{ {
// build // build
auto iter = fields_.find(field_id); auto column = get_column(field_id);
if (iter != fields_.end()) { if (column) {
iter->second->BulkRawStringAt( column->BulkRawStringAt(
[&](std::string_view value, size_t offset, bool is_valid) { [&](std::string_view value, size_t offset, bool is_valid) {
index->AddTextSealed(std::string(value), is_valid, offset); index->AddTextSealed(std::string(value), is_valid, offset);
}); });
@ -1622,7 +1630,10 @@ ChunkedSegmentSealedImpl::get_raw_data(FieldId field_id,
// DO NOT directly access the column by map like: `fields_.at(field_id)->Data()`, // DO NOT directly access the column by map like: `fields_.at(field_id)->Data()`,
// we have to clone the shared pointer, // we have to clone the shared pointer,
// to make sure it won't get released if segment released // to make sure it won't get released if segment released
auto column = fields_.at(field_id); auto column = get_column(field_id);
AssertInfo(column != nullptr,
"field {} must exist when getting raw data",
field_id.get());
auto ret = fill_with_empty(field_id, count); auto ret = fill_with_empty(field_id, count);
if (column->IsNullable()) { if (column->IsNullable()) {
auto dst = ret->mutable_valid_data()->mutable_data(); auto dst = ret->mutable_valid_data()->mutable_data();
@ -1890,7 +1901,10 @@ ChunkedSegmentSealedImpl::bulk_subscript(
return fill_with_empty(field_id, 0); return fill_with_empty(field_id, 0);
} }
auto column = fields_.at(field_id); auto column = get_column(field_id);
AssertInfo(column != nullptr,
"json field {} must exist when bulk_subscript",
field_id.get());
auto ret = fill_with_empty(field_id, count); auto ret = fill_with_empty(field_id, count);
if (column->IsNullable()) { if (column->IsNullable()) {
auto dst = ret->mutable_valid_data()->mutable_data(); auto dst = ret->mutable_valid_data()->mutable_data();
@ -1939,10 +1953,11 @@ ChunkedSegmentSealedImpl::GetFieldDataIfExist(FieldId field_id) const {
if (!exists) { if (!exists) {
return {nullptr, false}; return {nullptr, false};
} }
AssertInfo(fields_.find(field_id) != fields_.end(), auto column = get_column(field_id);
AssertInfo(column != nullptr,
"field {} must exist if bitset is set", "field {} must exist if bitset is set",
field_id.get()); field_id.get());
return {fields_.at(field_id), exists}; return {column, exists};
} }
bool bool
@ -2191,11 +2206,9 @@ ChunkedSegmentSealedImpl::generate_interim_index(const FieldId field_id,
if (row_count < field_binlog_config->GetBuildThreshold()) { if (row_count < field_binlog_config->GetBuildThreshold()) {
return false; return false;
} }
std::shared_ptr<ChunkedColumnInterface> vec_data{}; std::shared_ptr<ChunkedColumnInterface> vec_data = get_column(field_id);
{ AssertInfo(
std::shared_lock lck(mutex_); vec_data != nullptr, "vector field {} not loaded", field_id.get());
vec_data = fields_.at(field_id);
}
auto dim = is_sparse ? std::numeric_limits<uint32_t>::max() auto dim = is_sparse ? std::numeric_limits<uint32_t>::max()
: field_meta.get_dim(); : field_meta.get_dim();
auto interim_index_type = field_binlog_config->GetIndexType(); auto interim_index_type = field_binlog_config->GetIndexType();
@ -2292,10 +2305,13 @@ ChunkedSegmentSealedImpl::load_field_data_common(
!get_bit(field_data_ready_bitset_, field_id), !get_bit(field_data_ready_bitset_, field_id),
"non system field {} data already loaded", "non system field {} data already loaded",
field_id.get()); field_id.get());
AssertInfo(fields_.count(field_id) == 0, bool already_exists = false;
"field {} column already exists", fields_.withRLock([&](auto& fields) {
field_id.get()); already_exists = fields.find(field_id) != fields.end();
fields_.emplace(field_id, column); });
AssertInfo(
!already_exists, "field {} column already exists", field_id.get());
fields_.wlock()->emplace(field_id, column);
if (enable_mmap) { if (enable_mmap) {
mmap_fields_.insert(field_id); mmap_fields_.insert(field_id);
} }
@ -2340,8 +2356,9 @@ ChunkedSegmentSealedImpl::load_field_data_common(
set_bit(field_data_ready_bitset_, field_id, true); set_bit(field_data_ready_bitset_, field_id, true);
update_row_count(num_rows); update_row_count(num_rows);
if (generated_interim_index) { if (generated_interim_index) {
if (auto column = fields_.find(field_id); column != fields_.end()) { auto column = get_column(field_id);
column->second->ManualEvictCache(); if (column) {
column->ManualEvictCache();
} }
} }
} }
@ -2454,7 +2471,7 @@ ChunkedSegmentSealedImpl::fill_empty_field(const FieldMeta& field_meta) {
} }
} }
fields_.emplace(field_id, column); fields_.wlock()->emplace(field_id, column);
set_bit(field_data_ready_bitset_, field_id, true); set_bit(field_data_ready_bitset_, field_id, true);
LOG_INFO("fill empty field {} (data type {}) for growing segment {} done", LOG_INFO("fill empty field {} (data type {}) for growing segment {} done",
field_meta.get_data_type(), field_meta.get_data_type(),

View File

@ -13,6 +13,7 @@
#include <tbb/concurrent_priority_queue.h> #include <tbb/concurrent_priority_queue.h>
#include <tbb/concurrent_vector.h> #include <tbb/concurrent_vector.h>
#include <folly/Synchronized.h>
#include <memory> #include <memory>
#include <string> #include <string>
@ -148,7 +149,7 @@ class ChunkedSegmentSealedImpl : public SegmentSealed {
std::function<void(milvus::Json, size_t, bool)> fn, std::function<void(milvus::Json, size_t, bool)> fn,
const int64_t* offsets, const int64_t* offsets,
int64_t count) const override { int64_t count) const override {
auto column = fields_.at(field_id); auto column = fields_.rlock()->at(field_id);
column->BulkRawJsonAt(fn, offsets, count); column->BulkRawJsonAt(fn, offsets, count);
} }
@ -464,6 +465,18 @@ class ChunkedSegmentSealedImpl : public SegmentSealed {
bool enable_mmap, bool enable_mmap,
bool is_proxy_column); bool is_proxy_column);
std::shared_ptr<ChunkedColumnInterface>
get_column(FieldId field_id) const {
std::shared_ptr<ChunkedColumnInterface> res;
fields_.withRLock([&](auto& fields) {
auto it = fields.find(field_id);
if (it != fields.end()) {
res = it->second;
}
});
return res;
}
private: private:
// InsertRecord needs to pin pk column. // InsertRecord needs to pin pk column.
friend class storagev1translator::InsertRecordTranslator; friend class storagev1translator::InsertRecordTranslator;
@ -507,7 +520,8 @@ class ChunkedSegmentSealedImpl : public SegmentSealed {
SchemaPtr schema_; SchemaPtr schema_;
int64_t id_; int64_t id_;
mutable std::unordered_map<FieldId, std::shared_ptr<ChunkedColumnInterface>> mutable folly::Synchronized<
std::unordered_map<FieldId, std::shared_ptr<ChunkedColumnInterface>>>
fields_; fields_;
std::unordered_set<FieldId> mmap_fields_; std::unordered_set<FieldId> mmap_fields_;