diff --git a/internal/core/src/index/InvertedIndexTantivy.cpp b/internal/core/src/index/InvertedIndexTantivy.cpp index 7b45f32696..49f7874180 100644 --- a/internal/core/src/index/InvertedIndexTantivy.cpp +++ b/internal/core/src/index/InvertedIndexTantivy.cpp @@ -123,10 +123,13 @@ InvertedIndexTantivy::finish() { template BinarySet InvertedIndexTantivy::Serialize(const Config& config) { - auto index_valid_data_length = null_offset.size() * sizeof(size_t); + folly::SharedMutex::ReadHolder lock(mutex_); + auto index_valid_data_length = null_offset_.size() * sizeof(size_t); std::shared_ptr index_valid_data( new uint8_t[index_valid_data_length]); - memcpy(index_valid_data.get(), null_offset.data(), index_valid_data_length); + memcpy( + index_valid_data.get(), null_offset_.data(), index_valid_data_length); + lock.unlock(); BinarySet res_set; if (index_valid_data_length > 0) { res_set.Append( @@ -229,8 +232,9 @@ InvertedIndexTantivy::Load(milvus::tracer::TraceContext ctx, binary_set.Append(key, buf, size); } auto index_valid_data = binary_set.GetByName("index_null_offset"); - null_offset.resize((size_t)index_valid_data->size / sizeof(size_t)); - memcpy(null_offset.data(), + folly::SharedMutex::WriteHolder lock(mutex_); + null_offset_.resize((size_t)index_valid_data->size / sizeof(size_t)); + memcpy(null_offset_.data(), index_valid_data->data.get(), (size_t)index_valid_data->size); } @@ -253,10 +257,13 @@ InvertedIndexTantivy::In(size_t n, const T* values) { template const TargetBitmap InvertedIndexTantivy::IsNull() { - TargetBitmap bitset(Count()); - - for (size_t i = 0; i < null_offset.size(); ++i) { - bitset.set(null_offset[i]); + int64_t count = Count(); + TargetBitmap bitset(count); + folly::SharedMutex::ReadHolder lock(mutex_); + auto end = + std::lower_bound(null_offset_.begin(), null_offset_.end(), count); + for (auto iter = null_offset_.begin(); iter != end; ++iter) { + bitset.set(*iter); } return bitset; } @@ -264,9 +271,13 @@ InvertedIndexTantivy::IsNull() { template const TargetBitmap InvertedIndexTantivy::IsNotNull() { - TargetBitmap bitset(Count(), true); - for (size_t i = 0; i < null_offset.size(); ++i) { - bitset.reset(null_offset[i]); + int64_t count = Count(); + TargetBitmap bitset(count, true); + folly::SharedMutex::ReadHolder lock(mutex_); + auto end = + std::lower_bound(null_offset_.begin(), null_offset_.end(), count); + for (auto iter = null_offset_.begin(); iter != end; ++iter) { + bitset.reset(*iter); } return bitset; } @@ -296,13 +307,18 @@ InvertedIndexTantivy::InApplyCallback( template const TargetBitmap InvertedIndexTantivy::NotIn(size_t n, const T* values) { - TargetBitmap bitset(Count(), true); + int64_t count = Count(); + TargetBitmap bitset(count, true); for (size_t i = 0; i < n; ++i) { auto array = wrapper_->term_query(values[i]); apply_hits(bitset, array, false); } - for (size_t i = 0; i < null_offset.size(); ++i) { - bitset.reset(null_offset[i]); + + folly::SharedMutex::ReadHolder lock(mutex_); + auto end = + std::lower_bound(null_offset_.begin(), null_offset_.end(), count); + for (auto iter = null_offset_.begin(); iter != end; ++iter) { + bitset.reset(*iter); } return bitset; } @@ -466,7 +482,8 @@ InvertedIndexTantivy::BuildWithFieldData( for (const auto& data : field_datas) { total += data->get_null_count(); } - null_offset.reserve(total); + folly::SharedMutex::WriteHolder lock(mutex_); + null_offset_.reserve(total); } switch (schema_.data_type()) { case proto::schema::DataType::Bool: @@ -487,7 +504,8 @@ InvertedIndexTantivy::BuildWithFieldData( auto n = data->get_num_rows(); for (int i = 0; i < n; i++) { if (!data->is_valid(i)) { - null_offset.push_back(i); + folly::SharedMutex::WriteHolder lock(mutex_); + null_offset_.push_back(i); } wrapper_->add_multi_data( static_cast(data->RawValue(i)), @@ -509,7 +527,8 @@ InvertedIndexTantivy::BuildWithFieldData( if (schema_.nullable()) { for (int i = 0; i < n; i++) { if (!data->is_valid(i)) { - null_offset.push_back(i); + folly::SharedMutex::WriteHolder lock(mutex_); + null_offset_.push_back(i); } wrapper_ ->add_multi_data_by_single_segment_writer( @@ -547,7 +566,8 @@ InvertedIndexTantivy::build_index_for_array( auto array_column = static_cast(data->Data()); for (int64_t i = 0; i < n; i++) { if (schema_.nullable() && !data->is_valid(i)) { - null_offset.push_back(i); + folly::SharedMutex::WriteHolder lock(mutex_); + null_offset_.push_back(i); } auto length = data->is_valid(i) ? array_column[i].length() : 0; if (!inverted_index_single_segment_) { @@ -576,7 +596,8 @@ InvertedIndexTantivy::build_index_for_array( Assert(IsStringDataType( static_cast(schema_.element_type()))); if (schema_.nullable() && !data->is_valid(i)) { - null_offset.push_back(i); + folly::SharedMutex::WriteHolder lock(mutex_); + null_offset_.push_back(i); } std::vector output; for (int64_t j = 0; j < array_column[i].length(); j++) { diff --git a/internal/core/src/index/InvertedIndexTantivy.h b/internal/core/src/index/InvertedIndexTantivy.h index 656202347f..64fafe1a3a 100644 --- a/internal/core/src/index/InvertedIndexTantivy.h +++ b/internal/core/src/index/InvertedIndexTantivy.h @@ -13,6 +13,7 @@ #include #include +#include #include "common/RegexQuery.h" #include "index/Index.h" #include "storage/FileManager.h" @@ -203,9 +204,10 @@ class InvertedIndexTantivy : public ScalarIndex { MemFileManagerPtr mem_file_manager_; DiskFileManagerPtr disk_file_manager_; + folly::SharedMutexWritePriority mutex_{}; // all data need to be built to align the offset - // so need to store null_offset in inverted index additionally - std::vector null_offset{}; + // so need to store null_offset_ in inverted index additionally + std::vector null_offset_{}; // `inverted_index_single_segment_` is used to control whether to build tantivy index with single segment. // diff --git a/internal/core/src/index/TextMatchIndex.cpp b/internal/core/src/index/TextMatchIndex.cpp index 9bf804b82d..ee69d91044 100644 --- a/internal/core/src/index/TextMatchIndex.cpp +++ b/internal/core/src/index/TextMatchIndex.cpp @@ -147,8 +147,9 @@ TextMatchIndex::Load(const Config& config) { binary_set.Append(key, buf, size); } auto index_valid_data = binary_set.GetByName("index_null_offset"); - null_offset.resize((size_t)index_valid_data->size / sizeof(size_t)); - memcpy(null_offset.data(), + folly::SharedMutex::WriteHolder lock(mutex_); + null_offset_.resize((size_t)index_valid_data->size / sizeof(size_t)); + memcpy(null_offset_.data(), index_valid_data->data.get(), (size_t)index_valid_data->size); } @@ -177,7 +178,10 @@ TextMatchIndex::AddText(const std::string& text, void TextMatchIndex::AddNull(int64_t offset) { - null_offset.push_back(offset); + { + folly::SharedMutex::WriteHolder lock(mutex_); + null_offset_.push_back(offset); + } // still need to add null to make offset is correct std::string empty = ""; wrapper_->add_multi_data(&empty, 0, offset); @@ -192,7 +196,8 @@ TextMatchIndex::AddTexts(size_t n, for (int i = 0; i < n; i++) { auto offset = i + offset_begin; if (!valids[i]) { - null_offset.push_back(offset); + folly::SharedMutex::WriteHolder lock(mutex_); + null_offset_.push_back(offset); } } } @@ -212,12 +217,16 @@ TextMatchIndex::BuildIndexFromFieldData( for (const auto& data : field_datas) { total += data->get_null_count(); } - null_offset.reserve(total); + { + folly::SharedMutex::WriteHolder lock(mutex_); + null_offset_.reserve(total); + } for (const auto& data : field_datas) { auto n = data->get_num_rows(); for (int i = 0; i < n; i++) { if (!data->is_valid(i)) { - null_offset.push_back(i); + folly::SharedMutex::WriteHolder lock(mutex_); + null_offset_.push_back(i); } wrapper_->add_data( static_cast(data->RawValue(i)), diff --git a/internal/core/unittest/test_text_match.cpp b/internal/core/unittest/test_text_match.cpp index f7b6366385..9c2998bb5e 100644 --- a/internal/core/unittest/test_text_match.cpp +++ b/internal/core/unittest/test_text_match.cpp @@ -813,4 +813,63 @@ TEST(TextMatch, GrowingLoadData) { ASSERT_FALSE(final[4]); ASSERT_TRUE(final[5]); ASSERT_FALSE(final[6]); +} + +TEST(TextMatch, ConcurrentReadWriteWithNull) { + auto schema = GenTestSchema({}, true); + auto seg = CreateGrowingSegment(schema, empty_index_meta); + int64_t N = 1000; + uint64_t seed = 19190504; + auto raw_data = DataGen(schema, N, seed); + auto str_col_valid = + raw_data.raw_->mutable_fields_data()->at(1).mutable_valid_data(); + auto str_col = raw_data.raw_->mutable_fields_data() + ->at(1) + .mutable_scalars() + ->mutable_string_data() + ->mutable_data(); + for (int64_t i = 0; i < N - 1; i++) { + str_col->at(i) = ""; + } + str_col->at(N - 1) = "football"; + for (int64_t i = 0; i < N - 1; i++) { + str_col_valid->at(i) = false; + } + + std::thread writer([&seg, &raw_data, N]() { + seg->PreInsert(N); + seg->Insert(0, + N, + raw_data.row_ids_.data(), + raw_data.timestamps_.data(), + raw_data.raw_); + }); + + std::thread reader([&seg, N]() { + auto start = std::chrono::high_resolution_clock::now(); + ; + const std::chrono::seconds timeout_duration{2}; + while (true) { + if (start - std::chrono::high_resolution_clock::now() > + timeout_duration) { + ASSERT_TRUE(false) + << "Failed to get valid results within timeout"; + break; + } + BitsetType final; + auto expr = GetTextMatchExpr(GenTestSchema(), "football"); + final = ExecuteQueryExpr(expr, seg.get(), N, MAX_TIMESTAMP); + if (final.size() != N || !final[N - 1]) { + continue; + } + for (int64_t i = 0; i < N - 1; i++) { + ASSERT_FALSE(final[i]); + } + ASSERT_TRUE(final[N - 1]); + break; + } + }); + + writer.join(); + reader.join(); } \ No newline at end of file