From fce0bbe2ae513856e64fcd8f4e101797e61e2a25 Mon Sep 17 00:00:00 2001 From: Spade A <71589810+SpadeA-Tang@users.noreply.github.com> Date: Fri, 4 Jul 2025 10:10:45 +0800 Subject: [PATCH] fix: remove redundant locks for null_offset (#43103) Ref: https://github.com/milvus-io/milvus/issues/40308 https://github.com/milvus-io/milvus/pull/40363 add lock for protecting concurrent read/write for null offset. But we don't need this for sealed segment. --------- Signed-off-by: SpadeA --- .../core/src/index/InvertedIndexTantivy.cpp | 65 +++++++++++++------ .../core/src/index/InvertedIndexTantivy.h | 9 +++ internal/core/src/index/JsonInvertedIndex.cpp | 5 +- .../src/index/JsonKeyStatsInvertedIndex.cpp | 2 + .../core/src/index/NgramInvertedIndex.cpp | 1 - internal/core/src/index/TextMatchIndex.cpp | 34 ++++------ internal/core/src/index/TextMatchIndex.h | 6 +- .../src/segcore/ChunkedSegmentSealedImpl.cpp | 6 +- .../core/src/segcore/SegmentGrowingImpl.cpp | 2 +- internal/core/unittest/test_text_match.cpp | 6 +- 10 files changed, 80 insertions(+), 56 deletions(-) diff --git a/internal/core/src/index/InvertedIndexTantivy.cpp b/internal/core/src/index/InvertedIndexTantivy.cpp index fb1dd338f7..75c99ec0ce 100644 --- a/internal/core/src/index/InvertedIndexTantivy.cpp +++ b/internal/core/src/index/InvertedIndexTantivy.cpp @@ -206,7 +206,6 @@ InvertedIndexTantivy::Load(milvus::tracer::TraceContext ctx, }; auto fill_null_offsets = [&](const uint8_t* data, int64_t size) { - folly::SharedMutex::WriteHolder lock(mutex_); null_offset_.resize((size_t)size / sizeof(size_t)); memcpy(null_offset_.data(), data, (size_t)size); }; @@ -287,12 +286,22 @@ const TargetBitmap InvertedIndexTantivy::IsNull() { int64_t count = Count(); TargetBitmap bitset(count); - folly::SharedMutex::ReadHolder lock(mutex_); - auto end = - std::lower_bound(null_offset_.begin(), null_offset_.end(), count); - for (auto iter = null_offset_.begin(); iter != end; ++iter) { - bitset.set(*iter); + + auto fill_bitset = [this, count, &bitset]() { + auto end = + std::lower_bound(null_offset_.begin(), null_offset_.end(), count); + for (auto iter = null_offset_.begin(); iter != end; ++iter) { + bitset.set(*iter); + } + }; + + if (is_growing_) { + folly::SharedMutex::ReadHolder lock(mutex_); + fill_bitset(); + } else { + fill_bitset(); } + return bitset; } @@ -301,12 +310,22 @@ const TargetBitmap InvertedIndexTantivy::IsNotNull() { int64_t count = Count(); TargetBitmap bitset(count, true); - folly::SharedMutex::ReadHolder lock(mutex_); - auto end = - std::lower_bound(null_offset_.begin(), null_offset_.end(), count); - for (auto iter = null_offset_.begin(); iter != end; ++iter) { - bitset.reset(*iter); + + auto fill_bitset = [this, count, &bitset]() { + auto end = + std::lower_bound(null_offset_.begin(), null_offset_.end(), count); + for (auto iter = null_offset_.begin(); iter != end; ++iter) { + bitset.reset(*iter); + } + }; + + if (is_growing_) { + folly::SharedMutex::ReadHolder lock(mutex_); + fill_bitset(); + } else { + fill_bitset(); } + return bitset; } @@ -346,12 +365,21 @@ InvertedIndexTantivy::NotIn(size_t n, const T* values) { // The expression is "not" in, so we flip the bit. bitset.flip(); - folly::SharedMutex::ReadHolder lock(mutex_); - auto end = - std::lower_bound(null_offset_.begin(), null_offset_.end(), count); - for (auto iter = null_offset_.begin(); iter != end; ++iter) { - bitset.reset(*iter); + auto fill_bitset = [this, count, &bitset]() { + auto end = + std::lower_bound(null_offset_.begin(), null_offset_.end(), count); + for (auto iter = null_offset_.begin(); iter != end; ++iter) { + bitset.reset(*iter); + } + }; + + if (is_growing_) { + folly::SharedMutex::ReadHolder lock(mutex_); + fill_bitset(); + } else { + fill_bitset(); } + return bitset; } @@ -520,7 +548,6 @@ InvertedIndexTantivy::BuildWithFieldData( for (const auto& data : field_datas) { total += data->get_null_count(); } - folly::SharedMutex::WriteHolder lock(mutex_); null_offset_.reserve(total); } switch (schema_.data_type()) { @@ -542,7 +569,6 @@ InvertedIndexTantivy::BuildWithFieldData( auto n = data->get_num_rows(); for (int i = 0; i < n; i++) { if (!data->is_valid(i)) { - folly::SharedMutex::WriteHolder lock(mutex_); null_offset_.push_back(offset); } wrapper_->add_array_data( @@ -565,7 +591,6 @@ InvertedIndexTantivy::BuildWithFieldData( if (schema_.nullable()) { for (int i = 0; i < n; i++) { if (!data->is_valid(i)) { - folly::SharedMutex::WriteHolder lock(mutex_); null_offset_.push_back(i); } wrapper_ @@ -613,7 +638,6 @@ InvertedIndexTantivy::build_index_for_array( auto array_column = static_cast(data->Data()); for (int64_t i = 0; i < n; i++) { if (schema_.nullable() && !data->is_valid(i)) { - folly::SharedMutex::WriteHolder lock(mutex_); null_offset_.push_back(offset); } auto length = data->is_valid(i) ? array_column[i].length() : 0; @@ -644,7 +668,6 @@ InvertedIndexTantivy::build_index_for_array( auto array_column = static_cast(data->Data()); for (int64_t i = 0; i < n; i++) { if (schema_.nullable() && !data->is_valid(i)) { - folly::SharedMutex::WriteHolder lock(mutex_); null_offset_.push_back(offset); } else { Assert(IsStringDataType(array_column[i].get_element_type())); diff --git a/internal/core/src/index/InvertedIndexTantivy.h b/internal/core/src/index/InvertedIndexTantivy.h index a101fb0172..f4060d824f 100644 --- a/internal/core/src/index/InvertedIndexTantivy.h +++ b/internal/core/src/index/InvertedIndexTantivy.h @@ -245,6 +245,11 @@ class InvertedIndexTantivy : public ScalarIndex { void BuildWithFieldData(const std::vector& datas) override; + void + set_is_growing(bool is_growing) { + is_growing_ = is_growing; + } + protected: void finish(); @@ -300,5 +305,9 @@ class InvertedIndexTantivy : public ScalarIndex { // built from a higher version of tantivy which is not supported. // Therefore, we should provide a way to allow higher version of milvus to build tantivy index with low version. uint32_t tantivy_index_version_{0}; + + // for now, only TextMatchIndex and JsonKeyStatsInvertedIndex can be built for growing segment, + // and can read and insert concurrently. + bool is_growing_{false}; }; } // namespace milvus::index diff --git a/internal/core/src/index/JsonInvertedIndex.cpp b/internal/core/src/index/JsonInvertedIndex.cpp index cf0b8287b1..ba6ec97a35 100644 --- a/internal/core/src/index/JsonInvertedIndex.cpp +++ b/internal/core/src/index/JsonInvertedIndex.cpp @@ -56,10 +56,7 @@ JsonInvertedIndex::build_index_for_json( for (int64_t i = 0; i < n; i++) { auto json_column = static_cast(data->RawValue(i)); if (this->schema_.nullable() && !data->is_valid(i)) { - { - folly::SharedMutex::WriteHolder lock(this->mutex_); - this->null_offset_.push_back(offset); - } + this->null_offset_.push_back(offset); this->wrapper_->template add_array_data( nullptr, 0, offset++); continue; diff --git a/internal/core/src/index/JsonKeyStatsInvertedIndex.cpp b/internal/core/src/index/JsonKeyStatsInvertedIndex.cpp index 4e2effef08..e1edb88384 100644 --- a/internal/core/src/index/JsonKeyStatsInvertedIndex.cpp +++ b/internal/core/src/index/JsonKeyStatsInvertedIndex.cpp @@ -309,6 +309,7 @@ JsonKeyStatsInvertedIndex::JsonKeyStatsInvertedIndex( last_commit_time_(stdclock::now()) { wrapper_ = std::make_shared( unique_id, "", TANTIVY_INDEX_LATEST_VERSION, true /* in_ram */); + set_is_growing(true); } JsonKeyStatsInvertedIndex::JsonKeyStatsInvertedIndex( @@ -323,6 +324,7 @@ JsonKeyStatsInvertedIndex::JsonKeyStatsInvertedIndex( boost::filesystem::create_directories(path_); wrapper_ = std::make_shared( unique_id, path_.c_str(), TANTIVY_INDEX_LATEST_VERSION); + set_is_growing(true); } IndexStatsPtr diff --git a/internal/core/src/index/NgramInvertedIndex.cpp b/internal/core/src/index/NgramInvertedIndex.cpp index c1d1d51f3b..9eb741096e 100644 --- a/internal/core/src/index/NgramInvertedIndex.cpp +++ b/internal/core/src/index/NgramInvertedIndex.cpp @@ -81,7 +81,6 @@ NgramInvertedIndex::Load(milvus::tracer::TraceContext ctx, BinarySet binary_set; AssembleIndexDatas(index_datas, binary_set); auto index_valid_data = binary_set.GetByName("index_null_offset"); - folly::SharedMutex::WriteHolder lock(mutex_); null_offset_.resize((size_t)index_valid_data->size / sizeof(size_t)); memcpy(null_offset_.data(), index_valid_data->data.get(), diff --git a/internal/core/src/index/TextMatchIndex.cpp b/internal/core/src/index/TextMatchIndex.cpp index 92bfce7623..9069ac2094 100644 --- a/internal/core/src/index/TextMatchIndex.cpp +++ b/internal/core/src/index/TextMatchIndex.cpp @@ -34,6 +34,7 @@ TextMatchIndex::TextMatchIndex(int64_t commit_interval_in_ms, , tokenizer_name, analyzer_params); + set_is_growing(true); } TextMatchIndex::TextMatchIndex(const std::string& path, @@ -152,7 +153,6 @@ TextMatchIndex::Load(const Config& config) { BinarySet binary_set; AssembleIndexDatas(index_datas, binary_set); auto index_valid_data = binary_set.GetByName("index_null_offset"); - folly::SharedMutex::WriteHolder lock(mutex_); null_offset_.resize((size_t)index_valid_data->size / sizeof(size_t)); memcpy(null_offset_.data(), index_valid_data->data.get(), @@ -175,39 +175,33 @@ TextMatchIndex::Load(const Config& config) { } } +// Add text for sealed segment void -TextMatchIndex::AddText(const std::string& text, - const bool valid, - int64_t offset) { +TextMatchIndex::AddTextSealed(const std::string& text, + const bool valid, + int64_t offset) { if (!valid) { - AddNull(offset); - if (shouldTriggerCommit()) { - Commit(); - } + AddNullSealed(offset); return; } wrapper_->add_data(&text, 1, offset); - if (shouldTriggerCommit()) { - Commit(); - } } +// Add null for sealed segment void -TextMatchIndex::AddNull(int64_t offset) { - { - folly::SharedMutex::WriteHolder lock(mutex_); - null_offset_.push_back(offset); - } +TextMatchIndex::AddNullSealed(int64_t offset) { + null_offset_.push_back(offset); // still need to add null to make offset is correct std::string empty = ""; wrapper_->add_array_data(&empty, 0, offset); } +// Add texts for growing segment void -TextMatchIndex::AddTexts(size_t n, - const std::string* texts, - const bool* valids, - int64_t offset_begin) { +TextMatchIndex::AddTextsGrowing(size_t n, + const std::string* texts, + const bool* valids, + int64_t offset_begin) { if (valids != nullptr) { for (int i = 0; i < n; i++) { auto offset = i + offset_begin; diff --git a/internal/core/src/index/TextMatchIndex.h b/internal/core/src/index/TextMatchIndex.h index 10827cb19e..a01841f88e 100644 --- a/internal/core/src/index/TextMatchIndex.h +++ b/internal/core/src/index/TextMatchIndex.h @@ -50,13 +50,13 @@ class TextMatchIndex : public InvertedIndexTantivy { public: void - AddText(const std::string& text, const bool valid, int64_t offset); + AddTextSealed(const std::string& text, const bool valid, int64_t offset); void - AddNull(int64_t offset); + AddNullSealed(int64_t offset); void - AddTexts(size_t n, + AddTextsGrowing(size_t n, const std::string* texts, const bool* valids, int64_t offset_begin); diff --git a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp index e36468ad2b..f619057621 100644 --- a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp +++ b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp @@ -1191,7 +1191,7 @@ ChunkedSegmentSealedImpl::CreateTextIndex(FieldId field_id) { if (iter != fields_.end()) { iter->second->BulkRawStringAt( [&](std::string_view value, size_t offset, bool is_valid) { - index->AddText(std::string(value), is_valid, offset); + index->AddTextSealed(std::string(value), is_valid, offset); }); } else { // fetch raw data from index. auto field_index_iter = scalar_indexings_.find(field_id); @@ -1212,9 +1212,9 @@ ChunkedSegmentSealedImpl::CreateTextIndex(FieldId field_id) { for (size_t i = 0; i < n; i++) { auto raw = impl->Reverse_Lookup(i); if (!raw.has_value()) { - index->AddNull(i); + index->AddNullSealed(i); } - index->AddText(raw.value(), true, i); + index->AddTextSealed(raw.value(), true, i); } } } diff --git a/internal/core/src/segcore/SegmentGrowingImpl.cpp b/internal/core/src/segcore/SegmentGrowingImpl.cpp index 2b1f21ad7b..b4febc00dd 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.cpp +++ b/internal/core/src/segcore/SegmentGrowingImpl.cpp @@ -1167,7 +1167,7 @@ SegmentGrowingImpl::AddTexts(milvus::FieldId field_id, ErrorCode::TextIndexNotFound, fmt::format("text index not found for field {}", field_id.get())); } - iter->second->AddTexts(n, texts, texts_valid_data, offset_begin); + iter->second->AddTextsGrowing(n, texts, texts_valid_data, offset_begin); } void diff --git a/internal/core/unittest/test_text_match.cpp b/internal/core/unittest/test_text_match.cpp index d43159bcc9..1899693f29 100644 --- a/internal/core/unittest/test_text_match.cpp +++ b/internal/core/unittest/test_text_match.cpp @@ -156,9 +156,9 @@ TEST(TextMatch, Index) { "milvus_tokenizer", "{}"); index->CreateReader(milvus::index::SetBitsetSealed); - index->AddText("football, basketball, pingpang", true, 0); - index->AddText("", false, 1); - index->AddText("swimming, football", true, 2); + index->AddTextSealed("football, basketball, pingpang", true, 0); + index->AddTextSealed("", false, 1); + index->AddTextSealed("swimming, football", true, 2); index->Commit(); index->Reload();