fix: Revert "fix: Use folly::SharedMutex preventing starvation (#43937)" (#43959)

Related to #43958

This reverts commit 580350495ab40b3c0a2ec473882258edf6d7dd08.

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2025-08-21 10:09:47 +08:00 committed by GitHub
parent 399f63300c
commit 7963b17ac1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 46 additions and 48 deletions

View File

@ -43,7 +43,6 @@
#include "common/Tracer.h"
#include "common/Types.h"
#include "common/resource_c.h"
#include <folly/SharedMutex.h>
#include "monitor/scope_metric.h"
#include "google/protobuf/message_lite.h"
#include "index/Index.h"
@ -111,7 +110,7 @@ ChunkedSegmentSealedImpl::LoadVecIndex(const LoadIndexInfo& info) {
"Can't get metric_type in index_params");
auto metric_type = info.index_params.at("metric_type");
folly::SharedMutex::WriteHolder lck(&mutex_);
std::unique_lock lck(mutex_);
AssertInfo(
!get_bit(index_ready_bitset_, field_id),
"vector index has been exist at " + std::to_string(field_id.get()));
@ -163,7 +162,7 @@ ChunkedSegmentSealedImpl::LoadScalarIndex(const LoadIndexInfo& info) {
return;
}
folly::SharedMutex::WriteHolder lck(&mutex_);
std::unique_lock lck(mutex_);
AssertInfo(
!get_bit(index_ready_bitset_, field_id),
"scalar index has been exist at " + std::to_string(field_id.get()));
@ -458,7 +457,7 @@ ChunkedSegmentSealedImpl::load_system_field_internal(FieldId field_id,
}
}
{
folly::SharedMutex::WriteHolder lck(&mutex_);
std::unique_lock lck(mutex_);
update_row_count(num_rows);
}
}
@ -548,14 +547,14 @@ ChunkedSegmentSealedImpl::num_rows_until_chunk(FieldId field_id,
bool
ChunkedSegmentSealedImpl::is_mmap_field(FieldId field_id) const {
folly::SharedMutex::ReadHolder lck(&mutex_);
std::shared_lock lck(mutex_);
return mmap_fields_.find(field_id) != mmap_fields_.end();
}
PinWrapper<SpanBase>
ChunkedSegmentSealedImpl::chunk_data_impl(FieldId field_id,
int64_t chunk_id) const {
folly::SharedMutex::ReadHolder lck(&mutex_);
std::shared_lock lck(mutex_);
AssertInfo(get_bit(field_data_ready_bitset_, field_id),
"Can't get bitset element at " + std::to_string(field_id.get()));
if (auto it = fields_.find(field_id); it != fields_.end()) {
@ -571,7 +570,7 @@ ChunkedSegmentSealedImpl::chunk_array_view_impl(
int64_t chunk_id,
std::optional<std::pair<int64_t, int64_t>> offset_len =
std::nullopt) const {
folly::SharedMutex::ReadHolder lck(&mutex_);
std::shared_lock lck(mutex_);
AssertInfo(get_bit(field_data_ready_bitset_, field_id),
"Can't get bitset element at " + std::to_string(field_id.get()));
if (auto it = fields_.find(field_id); it != fields_.end()) {
@ -587,7 +586,7 @@ ChunkedSegmentSealedImpl::chunk_string_view_impl(
int64_t chunk_id,
std::optional<std::pair<int64_t, int64_t>> offset_len =
std::nullopt) const {
folly::SharedMutex::ReadHolder lck(&mutex_);
std::shared_lock lck(mutex_);
AssertInfo(get_bit(field_data_ready_bitset_, field_id),
"Can't get bitset element at " + std::to_string(field_id.get()));
if (auto it = fields_.find(field_id); it != fields_.end()) {
@ -603,7 +602,7 @@ ChunkedSegmentSealedImpl::chunk_string_views_by_offsets(
FieldId field_id,
int64_t chunk_id,
const FixedVector<int32_t>& offsets) const {
folly::SharedMutex::ReadHolder lck(&mutex_);
std::shared_lock lck(mutex_);
AssertInfo(get_bit(field_data_ready_bitset_, field_id),
"Can't get bitset element at " + std::to_string(field_id.get()));
if (auto it = fields_.find(field_id); it != fields_.end()) {
@ -618,7 +617,7 @@ ChunkedSegmentSealedImpl::chunk_array_views_by_offsets(
FieldId field_id,
int64_t chunk_id,
const FixedVector<int32_t>& offsets) const {
folly::SharedMutex::ReadHolder lck(&mutex_);
std::shared_lock lck(mutex_);
AssertInfo(get_bit(field_data_ready_bitset_, field_id),
"Can't get bitset element at " + std::to_string(field_id.get()));
if (auto it = fields_.find(field_id); it != fields_.end()) {
@ -633,7 +632,7 @@ ChunkedSegmentSealedImpl::chunk_array_views_by_offsets(
PinWrapper<const index::IndexBase*>
ChunkedSegmentSealedImpl::chunk_index_impl(FieldId field_id,
int64_t chunk_id) const {
folly::SharedMutex::ReadHolder lck(&mutex_);
std::shared_lock lck(mutex_);
AssertInfo(scalar_indexings_.find(field_id) != scalar_indexings_.end(),
"Cannot find scalar_indexing with field_id: " +
std::to_string(field_id.get()));
@ -647,7 +646,7 @@ ChunkedSegmentSealedImpl::chunk_index_impl(FieldId field_id,
PinWrapper<index::NgramInvertedIndex*>
ChunkedSegmentSealedImpl::GetNgramIndex(FieldId field_id) const {
folly::SharedMutex::ReadHolder lck(&mutex_);
std::shared_lock lck(mutex_);
auto iter = scalar_indexings_.find(field_id);
if (iter == scalar_indexings_.end()) {
return PinWrapper<index::NgramInvertedIndex*>(nullptr);
@ -666,7 +665,7 @@ ChunkedSegmentSealedImpl::GetNgramIndex(FieldId field_id) const {
PinWrapper<index::NgramInvertedIndex*>
ChunkedSegmentSealedImpl::GetNgramIndexForJson(
FieldId field_id, const std::string& nested_path) const {
folly::SharedMutex::ReadHolder lck(&mutex_);
std::shared_lock lck(mutex_);
auto iter = ngram_indexings_.find(field_id);
if (iter == ngram_indexings_.end() ||
iter->second.find(nested_path) == iter->second.end()) {
@ -688,13 +687,13 @@ ChunkedSegmentSealedImpl::GetNgramIndexForJson(
int64_t
ChunkedSegmentSealedImpl::get_row_count() const {
folly::SharedMutex::ReadHolder lck(&mutex_);
std::shared_lock lck(mutex_);
return num_rows_.value_or(0);
}
int64_t
ChunkedSegmentSealedImpl::get_deleted_count() const {
folly::SharedMutex::ReadHolder lck(&mutex_);
std::shared_lock lck(mutex_);
return deleted_record_.size();
}
@ -835,7 +834,7 @@ ChunkedSegmentSealedImpl::DropFieldData(const FieldId field_id) {
AssertInfo(!SystemProperty::Instance().IsSystem(field_id),
"Dropping system field is not supported, field id: {}",
field_id.get());
folly::SharedMutex::WriteHolder lck(&mutex_);
std::unique_lock<std::shared_mutex> lck(mutex_);
if (get_bit(field_data_ready_bitset_, field_id)) {
fields_.erase(field_id);
set_bit(field_data_ready_bitset_, field_id, false);
@ -856,7 +855,7 @@ ChunkedSegmentSealedImpl::DropIndex(const FieldId field_id) {
"Field meta of offset:" + std::to_string(field_id.get()) +
" is not vector type");
folly::SharedMutex::WriteHolder lck(&mutex_);
std::unique_lock lck(mutex_);
vector_indexings_.drop_field_indexing(field_id);
set_bit(index_ready_bitset_, field_id, false);
}
@ -1278,7 +1277,7 @@ ChunkedSegmentSealedImpl::bulk_subscript_vector_array_impl(
void
ChunkedSegmentSealedImpl::ClearData() {
{
folly::SharedMutex::WriteHolder lck(&mutex_);
std::unique_lock lck(mutex_);
field_data_ready_bitset_.reset();
index_ready_bitset_.reset();
binlog_index_bitset_.reset();
@ -1308,7 +1307,7 @@ ChunkedSegmentSealedImpl::fill_with_empty(FieldId field_id,
void
ChunkedSegmentSealedImpl::CreateTextIndex(FieldId field_id) {
folly::SharedMutex::WriteHolder lck(&mutex_);
std::unique_lock lck(mutex_);
const auto& field_meta = schema_->operator[](field_id);
auto& cfg = storage::MmapManager::GetInstance().GetMmapConfig();
@ -1382,7 +1381,7 @@ ChunkedSegmentSealedImpl::CreateTextIndex(FieldId field_id) {
void
ChunkedSegmentSealedImpl::LoadTextIndex(
FieldId field_id, std::unique_ptr<index::TextMatchIndex> index) {
folly::SharedMutex::WriteHolder lck(&mutex_);
std::unique_lock lck(mutex_);
const auto& field_meta = schema_->operator[](field_id);
index->RegisterTokenizer("milvus_tokenizer",
field_meta.get_analyzer_params().c_str());
@ -1677,14 +1676,14 @@ ChunkedSegmentSealedImpl::bulk_subscript(
bool
ChunkedSegmentSealedImpl::HasIndex(FieldId field_id) const {
folly::SharedMutex::ReadHolder lck(&mutex_);
std::shared_lock lck(mutex_);
return get_bit(index_ready_bitset_, field_id) |
get_bit(binlog_index_bitset_, field_id);
}
bool
ChunkedSegmentSealedImpl::HasFieldData(FieldId field_id) const {
folly::SharedMutex::ReadHolder lck(&mutex_);
std::shared_lock lck(mutex_);
if (SystemProperty::Instance().IsSystem(field_id)) {
return is_system_field_ready();
} else {
@ -1694,7 +1693,7 @@ ChunkedSegmentSealedImpl::HasFieldData(FieldId field_id) const {
std::pair<std::shared_ptr<ChunkedColumnInterface>, bool>
ChunkedSegmentSealedImpl::GetFieldDataIfExist(FieldId field_id) const {
folly::SharedMutex::ReadHolder lck(&mutex_);
std::shared_lock lck(mutex_);
bool exists;
if (SystemProperty::Instance().IsSystem(field_id)) {
exists = is_system_field_ready();
@ -1712,7 +1711,7 @@ ChunkedSegmentSealedImpl::GetFieldDataIfExist(FieldId field_id) const {
bool
ChunkedSegmentSealedImpl::HasRawData(int64_t field_id) const {
folly::SharedMutex::ReadHolder lck(&mutex_);
std::shared_lock lck(mutex_);
auto fieldID = FieldId(field_id);
const auto& field_meta = schema_->operator[](fieldID);
if (IsVectorDataType(field_meta.get_data_type())) {
@ -1835,7 +1834,7 @@ ChunkedSegmentSealedImpl::debug() const {
void
ChunkedSegmentSealedImpl::LoadSegmentMeta(
const proto::segcore::LoadSegmentMeta& segment_meta) {
folly::SharedMutex::WriteHolder lck(&mutex_);
std::unique_lock lck(mutex_);
std::vector<int64_t> slice_lengths;
for (auto& info : segment_meta.metas()) {
slice_lengths.push_back(info.row_count());
@ -1958,7 +1957,7 @@ ChunkedSegmentSealedImpl::generate_interim_index(const FieldId field_id,
}
std::shared_ptr<ChunkedColumnInterface> vec_data{};
{
folly::SharedMutex::ReadHolder lck(&mutex_);
std::shared_lock lck(mutex_);
vec_data = fields_.at(field_id);
}
auto dim = is_sparse ? std::numeric_limits<uint32_t>::max()
@ -1971,7 +1970,7 @@ ChunkedSegmentSealedImpl::generate_interim_index(const FieldId field_id,
auto index_metric = field_binlog_config->GetMetricType();
if (enable_binlog_index()) {
folly::SharedMutex::WriteHolder lck(&mutex_);
std::unique_lock lck(mutex_);
std::unique_ptr<
milvus::cachinglayer::Translator<milvus::index::IndexBase>>
@ -2052,7 +2051,7 @@ ChunkedSegmentSealedImpl::load_field_data_common(
bool enable_mmap,
bool is_proxy_column) {
{
folly::SharedMutex::WriteHolder lck(&mutex_);
std::unique_lock lck(mutex_);
AssertInfo(SystemProperty::Instance().IsSystem(field_id) ||
!get_bit(field_data_ready_bitset_, field_id),
"non system field {} data already loaded",
@ -2098,7 +2097,7 @@ ChunkedSegmentSealedImpl::load_field_data_common(
bool generated_interim_index = generate_interim_index(field_id, num_rows);
folly::SharedMutex::WriteHolder lck(&mutex_);
std::unique_lock lck(mutex_);
AssertInfo(!get_bit(field_data_ready_bitset_, field_id),
"field {} data already loaded",
field_id.get());
@ -2123,7 +2122,7 @@ ChunkedSegmentSealedImpl::init_timestamp_index(
index.build_with(timestamps.data(), num_rows);
// use special index
folly::SharedMutex::WriteHolder lck(&mutex_);
std::unique_lock lck(mutex_);
AssertInfo(insert_record_.timestamps_.empty(), "already exists");
insert_record_.timestamps_.set_data_raw(
0, timestamps.data(), timestamps.size());
@ -2135,7 +2134,7 @@ ChunkedSegmentSealedImpl::init_timestamp_index(
void
ChunkedSegmentSealedImpl::Reopen(SchemaPtr sch) {
folly::SharedMutex::WriteHolder lck(&mutex_);
std::unique_lock lck(mutex_);
field_data_ready_bitset_.resize(sch->size());
index_ready_bitset_.resize(sch->size());
@ -2156,7 +2155,7 @@ ChunkedSegmentSealedImpl::Reopen(SchemaPtr sch) {
void
ChunkedSegmentSealedImpl::FinishLoad() {
folly::SharedMutex::WriteHolder lck(&mutex_);
std::unique_lock lck(mutex_);
for (const auto& [field_id, field_meta] : schema_->get_fields()) {
if (field_id.get() < START_USER_FIELDID) {
continue;

View File

@ -1174,7 +1174,7 @@ SegmentGrowingImpl::mask_with_timestamps(BitsetTypeView& bitset_chunk,
void
SegmentGrowingImpl::CreateTextIndex(FieldId field_id) {
folly::SharedMutex::WriteHolder lck(&mutex_);
std::unique_lock lock(mutex_);
const auto& field_meta = schema_->operator[](field_id);
AssertInfo(IsStringDataType(field_meta.get_data_type()),
"cannot create text index on non-string type");
@ -1208,7 +1208,7 @@ SegmentGrowingImpl::AddTexts(milvus::FieldId field_id,
const bool* texts_valid_data,
size_t n,
int64_t offset_begin) {
folly::SharedMutex::WriteHolder lck(&mutex_);
std::unique_lock lock(mutex_);
auto iter = text_indexes_.find(field_id);
if (iter == text_indexes_.end()) {
throw SegcoreError(
@ -1224,7 +1224,7 @@ SegmentGrowingImpl::AddJSONDatas(FieldId field_id,
const bool* jsondatas_valid_data,
size_t n,
int64_t offset_begin) {
folly::SharedMutex::WriteHolder lck(&mutex_);
std::unique_lock lock(mutex_);
auto iter = json_indexes_.find(field_id);
AssertInfo(iter != json_indexes_.end(), "json index not found");
iter->second->AddJSONDatas(
@ -1242,7 +1242,7 @@ SegmentGrowingImpl::CreateJSONIndexes() {
void
SegmentGrowingImpl::CreateJSONIndex(FieldId field_id) {
folly::SharedMutex::WriteHolder lck(&mutex_);
std::unique_lock lock(mutex_);
const auto& field_meta = schema_->operator[](field_id);
AssertInfo(IsJsonDataType(field_meta.get_data_type()),
"cannot create json index on non-json type");

View File

@ -26,7 +26,7 @@ namespace milvus::segcore {
void
SegmentInternalInterface::FillPrimaryKeys(const query::Plan* plan,
SearchResult& results) const {
folly::SharedMutex::ReadHolder lck(&mutex_);
std::shared_lock lck(mutex_);
AssertInfo(plan, "empty plan");
auto size = results.distances_.size();
AssertInfo(results.seg_offsets_.size() == size,
@ -51,7 +51,7 @@ SegmentInternalInterface::FillPrimaryKeys(const query::Plan* plan,
void
SegmentInternalInterface::FillTargetEntry(const query::Plan* plan,
SearchResult& results) const {
folly::SharedMutex::ReadHolder lck(&mutex_);
std::shared_lock lck(mutex_);
AssertInfo(plan, "empty plan");
auto size = results.distances_.size();
AssertInfo(results.seg_offsets_.size() == size,
@ -86,7 +86,7 @@ SegmentInternalInterface::Search(
Timestamp timestamp,
int32_t consistency_level,
Timestamp collection_ttl) const {
folly::SharedMutex::ReadHolder lck(&mutex_);
std::shared_lock lck(mutex_);
milvus::tracer::AddEvent("obtained_segment_lock_mutex");
check_search(plan);
query::ExecPlanNodeVisitor visitor(
@ -105,7 +105,7 @@ SegmentInternalInterface::Retrieve(tracer::TraceContext* trace_ctx,
bool ignore_non_pk,
int32_t consistency_level,
Timestamp collection_ttl) const {
folly::SharedMutex::ReadHolder lck(&mutex_);
std::shared_lock lck(mutex_);
tracer::AutoSpan span("Retrieve", tracer::GetRootSpan());
auto results = std::make_unique<proto::segcore::RetrieveResults>();
query::ExecPlanNodeVisitor visitor(
@ -263,7 +263,7 @@ SegmentInternalInterface::Retrieve(tracer::TraceContext* trace_ctx,
const query::RetrievePlan* Plan,
const int64_t* offsets,
int64_t size) const {
folly::SharedMutex::ReadHolder lck(&mutex_);
std::shared_lock lck(mutex_);
tracer::AutoSpan span("RetrieveByOffsets", tracer::GetRootSpan());
auto results = std::make_unique<proto::segcore::RetrieveResults>();
std::chrono::high_resolution_clock::time_point get_target_entry_start =
@ -329,7 +329,7 @@ SegmentInternalInterface::get_field_avg_size(FieldId field_id) const {
auto& field_meta = schema[field_id];
auto data_type = field_meta.get_data_type();
folly::SharedMutex::ReadHolder lck(&mutex_);
std::shared_lock lck(mutex_);
if (IsVariableDataType(data_type)) {
if (variable_fields_avg_size_.find(field_id) ==
variable_fields_avg_size_.end()) {
@ -352,7 +352,7 @@ SegmentInternalInterface::set_field_avg_size(FieldId field_id,
auto& field_meta = schema[field_id];
auto data_type = field_meta.get_data_type();
folly::SharedMutex::WriteHolder lck(&mutex_);
std::unique_lock lck(mutex_);
if (IsVariableDataType(data_type)) {
AssertInfo(num_rows > 0,
"The num rows of field data should be greater than 0");
@ -418,7 +418,7 @@ SegmentInternalInterface::GetSkipIndex() const {
index::TextMatchIndex*
SegmentInternalInterface::GetTextIndex(FieldId field_id) const {
folly::SharedMutex::ReadHolder lck(&mutex_);
std::shared_lock lock(mutex_);
auto iter = text_indexes_.find(field_id);
if (iter == text_indexes_.end()) {
throw SegcoreError(
@ -528,7 +528,7 @@ SegmentInternalInterface::bulk_subscript_not_exist_field(
index::JsonKeyStatsInvertedIndex*
SegmentInternalInterface::GetJsonKeyIndex(FieldId field_id) const {
folly::SharedMutex::ReadHolder lck(&mutex_);
std::shared_lock lock(mutex_);
auto iter = json_indexes_.find(field_id);
if (iter == json_indexes_.end()) {
return nullptr;

View File

@ -31,7 +31,6 @@
#include "common/BitsetView.h"
#include "common/QueryResult.h"
#include "common/QueryInfo.h"
#include "folly/SharedMutex.h"
#include "mmap/ChunkedColumnInterface.h"
#include "index/Index.h"
#include "index/JsonFlatIndex.h"
@ -591,8 +590,8 @@ class SegmentInternalInterface : public SegmentInterface {
protected:
// mutex protecting rw options on schema_
std::shared_mutex sch_mutex_;
mutable folly::SharedMutex mutex_;
mutable std::shared_mutex mutex_;
// fieldID -> std::pair<num_rows, avg_size>
std::unordered_map<FieldId, std::pair<int64_t, int64_t>>
variable_fields_avg_size_; // bytes;