mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
fix: remove redundant locks for null_offset (#43103)
Ref: https://github.com/milvus-io/milvus/issues/40308 https://github.com/milvus-io/milvus/pull/40363 add lock for protecting concurrent read/write for null offset. But we don't need this for sealed segment. --------- Signed-off-by: SpadeA <tangchenjie1210@gmail.com>
This commit is contained in:
parent
ee053fa244
commit
fce0bbe2ae
@ -206,7 +206,6 @@ InvertedIndexTantivy<T>::Load(milvus::tracer::TraceContext ctx,
|
||||
};
|
||||
|
||||
auto fill_null_offsets = [&](const uint8_t* data, int64_t size) {
|
||||
folly::SharedMutex::WriteHolder lock(mutex_);
|
||||
null_offset_.resize((size_t)size / sizeof(size_t));
|
||||
memcpy(null_offset_.data(), data, (size_t)size);
|
||||
};
|
||||
@ -287,12 +286,22 @@ const TargetBitmap
|
||||
InvertedIndexTantivy<T>::IsNull() {
|
||||
int64_t count = Count();
|
||||
TargetBitmap bitset(count);
|
||||
folly::SharedMutex::ReadHolder lock(mutex_);
|
||||
auto end =
|
||||
std::lower_bound(null_offset_.begin(), null_offset_.end(), count);
|
||||
for (auto iter = null_offset_.begin(); iter != end; ++iter) {
|
||||
bitset.set(*iter);
|
||||
|
||||
auto fill_bitset = [this, count, &bitset]() {
|
||||
auto end =
|
||||
std::lower_bound(null_offset_.begin(), null_offset_.end(), count);
|
||||
for (auto iter = null_offset_.begin(); iter != end; ++iter) {
|
||||
bitset.set(*iter);
|
||||
}
|
||||
};
|
||||
|
||||
if (is_growing_) {
|
||||
folly::SharedMutex::ReadHolder lock(mutex_);
|
||||
fill_bitset();
|
||||
} else {
|
||||
fill_bitset();
|
||||
}
|
||||
|
||||
return bitset;
|
||||
}
|
||||
|
||||
@ -301,12 +310,22 @@ const TargetBitmap
|
||||
InvertedIndexTantivy<T>::IsNotNull() {
|
||||
int64_t count = Count();
|
||||
TargetBitmap bitset(count, true);
|
||||
folly::SharedMutex::ReadHolder lock(mutex_);
|
||||
auto end =
|
||||
std::lower_bound(null_offset_.begin(), null_offset_.end(), count);
|
||||
for (auto iter = null_offset_.begin(); iter != end; ++iter) {
|
||||
bitset.reset(*iter);
|
||||
|
||||
auto fill_bitset = [this, count, &bitset]() {
|
||||
auto end =
|
||||
std::lower_bound(null_offset_.begin(), null_offset_.end(), count);
|
||||
for (auto iter = null_offset_.begin(); iter != end; ++iter) {
|
||||
bitset.reset(*iter);
|
||||
}
|
||||
};
|
||||
|
||||
if (is_growing_) {
|
||||
folly::SharedMutex::ReadHolder lock(mutex_);
|
||||
fill_bitset();
|
||||
} else {
|
||||
fill_bitset();
|
||||
}
|
||||
|
||||
return bitset;
|
||||
}
|
||||
|
||||
@ -346,12 +365,21 @@ InvertedIndexTantivy<T>::NotIn(size_t n, const T* values) {
|
||||
// The expression is "not" in, so we flip the bit.
|
||||
bitset.flip();
|
||||
|
||||
folly::SharedMutex::ReadHolder lock(mutex_);
|
||||
auto end =
|
||||
std::lower_bound(null_offset_.begin(), null_offset_.end(), count);
|
||||
for (auto iter = null_offset_.begin(); iter != end; ++iter) {
|
||||
bitset.reset(*iter);
|
||||
auto fill_bitset = [this, count, &bitset]() {
|
||||
auto end =
|
||||
std::lower_bound(null_offset_.begin(), null_offset_.end(), count);
|
||||
for (auto iter = null_offset_.begin(); iter != end; ++iter) {
|
||||
bitset.reset(*iter);
|
||||
}
|
||||
};
|
||||
|
||||
if (is_growing_) {
|
||||
folly::SharedMutex::ReadHolder lock(mutex_);
|
||||
fill_bitset();
|
||||
} else {
|
||||
fill_bitset();
|
||||
}
|
||||
|
||||
return bitset;
|
||||
}
|
||||
|
||||
@ -520,7 +548,6 @@ InvertedIndexTantivy<T>::BuildWithFieldData(
|
||||
for (const auto& data : field_datas) {
|
||||
total += data->get_null_count();
|
||||
}
|
||||
folly::SharedMutex::WriteHolder lock(mutex_);
|
||||
null_offset_.reserve(total);
|
||||
}
|
||||
switch (schema_.data_type()) {
|
||||
@ -542,7 +569,6 @@ InvertedIndexTantivy<T>::BuildWithFieldData(
|
||||
auto n = data->get_num_rows();
|
||||
for (int i = 0; i < n; i++) {
|
||||
if (!data->is_valid(i)) {
|
||||
folly::SharedMutex::WriteHolder lock(mutex_);
|
||||
null_offset_.push_back(offset);
|
||||
}
|
||||
wrapper_->add_array_data<T>(
|
||||
@ -565,7 +591,6 @@ InvertedIndexTantivy<T>::BuildWithFieldData(
|
||||
if (schema_.nullable()) {
|
||||
for (int i = 0; i < n; i++) {
|
||||
if (!data->is_valid(i)) {
|
||||
folly::SharedMutex::WriteHolder lock(mutex_);
|
||||
null_offset_.push_back(i);
|
||||
}
|
||||
wrapper_
|
||||
@ -613,7 +638,6 @@ InvertedIndexTantivy<T>::build_index_for_array(
|
||||
auto array_column = static_cast<const Array*>(data->Data());
|
||||
for (int64_t i = 0; i < n; i++) {
|
||||
if (schema_.nullable() && !data->is_valid(i)) {
|
||||
folly::SharedMutex::WriteHolder lock(mutex_);
|
||||
null_offset_.push_back(offset);
|
||||
}
|
||||
auto length = data->is_valid(i) ? array_column[i].length() : 0;
|
||||
@ -644,7 +668,6 @@ InvertedIndexTantivy<std::string>::build_index_for_array(
|
||||
auto array_column = static_cast<const Array*>(data->Data());
|
||||
for (int64_t i = 0; i < n; i++) {
|
||||
if (schema_.nullable() && !data->is_valid(i)) {
|
||||
folly::SharedMutex::WriteHolder lock(mutex_);
|
||||
null_offset_.push_back(offset);
|
||||
} else {
|
||||
Assert(IsStringDataType(array_column[i].get_element_type()));
|
||||
|
||||
@ -245,6 +245,11 @@ class InvertedIndexTantivy : public ScalarIndex<T> {
|
||||
void
|
||||
BuildWithFieldData(const std::vector<FieldDataPtr>& datas) override;
|
||||
|
||||
void
|
||||
set_is_growing(bool is_growing) {
|
||||
is_growing_ = is_growing;
|
||||
}
|
||||
|
||||
protected:
|
||||
void
|
||||
finish();
|
||||
@ -300,5 +305,9 @@ class InvertedIndexTantivy : public ScalarIndex<T> {
|
||||
// built from a higher version of tantivy which is not supported.
|
||||
// Therefore, we should provide a way to allow higher version of milvus to build tantivy index with low version.
|
||||
uint32_t tantivy_index_version_{0};
|
||||
|
||||
// for now, only TextMatchIndex and JsonKeyStatsInvertedIndex can be built for growing segment,
|
||||
// and can read and insert concurrently.
|
||||
bool is_growing_{false};
|
||||
};
|
||||
} // namespace milvus::index
|
||||
|
||||
@ -56,10 +56,7 @@ JsonInvertedIndex<T>::build_index_for_json(
|
||||
for (int64_t i = 0; i < n; i++) {
|
||||
auto json_column = static_cast<const Json*>(data->RawValue(i));
|
||||
if (this->schema_.nullable() && !data->is_valid(i)) {
|
||||
{
|
||||
folly::SharedMutex::WriteHolder lock(this->mutex_);
|
||||
this->null_offset_.push_back(offset);
|
||||
}
|
||||
this->null_offset_.push_back(offset);
|
||||
this->wrapper_->template add_array_data<T>(
|
||||
nullptr, 0, offset++);
|
||||
continue;
|
||||
|
||||
@ -309,6 +309,7 @@ JsonKeyStatsInvertedIndex::JsonKeyStatsInvertedIndex(
|
||||
last_commit_time_(stdclock::now()) {
|
||||
wrapper_ = std::make_shared<TantivyIndexWrapper>(
|
||||
unique_id, "", TANTIVY_INDEX_LATEST_VERSION, true /* in_ram */);
|
||||
set_is_growing(true);
|
||||
}
|
||||
|
||||
JsonKeyStatsInvertedIndex::JsonKeyStatsInvertedIndex(
|
||||
@ -323,6 +324,7 @@ JsonKeyStatsInvertedIndex::JsonKeyStatsInvertedIndex(
|
||||
boost::filesystem::create_directories(path_);
|
||||
wrapper_ = std::make_shared<TantivyIndexWrapper>(
|
||||
unique_id, path_.c_str(), TANTIVY_INDEX_LATEST_VERSION);
|
||||
set_is_growing(true);
|
||||
}
|
||||
|
||||
IndexStatsPtr
|
||||
|
||||
@ -81,7 +81,6 @@ NgramInvertedIndex::Load(milvus::tracer::TraceContext ctx,
|
||||
BinarySet binary_set;
|
||||
AssembleIndexDatas(index_datas, binary_set);
|
||||
auto index_valid_data = binary_set.GetByName("index_null_offset");
|
||||
folly::SharedMutex::WriteHolder lock(mutex_);
|
||||
null_offset_.resize((size_t)index_valid_data->size / sizeof(size_t));
|
||||
memcpy(null_offset_.data(),
|
||||
index_valid_data->data.get(),
|
||||
|
||||
@ -34,6 +34,7 @@ TextMatchIndex::TextMatchIndex(int64_t commit_interval_in_ms,
|
||||
,
|
||||
tokenizer_name,
|
||||
analyzer_params);
|
||||
set_is_growing(true);
|
||||
}
|
||||
|
||||
TextMatchIndex::TextMatchIndex(const std::string& path,
|
||||
@ -152,7 +153,6 @@ TextMatchIndex::Load(const Config& config) {
|
||||
BinarySet binary_set;
|
||||
AssembleIndexDatas(index_datas, binary_set);
|
||||
auto index_valid_data = binary_set.GetByName("index_null_offset");
|
||||
folly::SharedMutex::WriteHolder lock(mutex_);
|
||||
null_offset_.resize((size_t)index_valid_data->size / sizeof(size_t));
|
||||
memcpy(null_offset_.data(),
|
||||
index_valid_data->data.get(),
|
||||
@ -175,39 +175,33 @@ TextMatchIndex::Load(const Config& config) {
|
||||
}
|
||||
}
|
||||
|
||||
// Add text for sealed segment
|
||||
void
|
||||
TextMatchIndex::AddText(const std::string& text,
|
||||
const bool valid,
|
||||
int64_t offset) {
|
||||
TextMatchIndex::AddTextSealed(const std::string& text,
|
||||
const bool valid,
|
||||
int64_t offset) {
|
||||
if (!valid) {
|
||||
AddNull(offset);
|
||||
if (shouldTriggerCommit()) {
|
||||
Commit();
|
||||
}
|
||||
AddNullSealed(offset);
|
||||
return;
|
||||
}
|
||||
wrapper_->add_data(&text, 1, offset);
|
||||
if (shouldTriggerCommit()) {
|
||||
Commit();
|
||||
}
|
||||
}
|
||||
|
||||
// Add null for sealed segment
|
||||
void
|
||||
TextMatchIndex::AddNull(int64_t offset) {
|
||||
{
|
||||
folly::SharedMutex::WriteHolder lock(mutex_);
|
||||
null_offset_.push_back(offset);
|
||||
}
|
||||
TextMatchIndex::AddNullSealed(int64_t offset) {
|
||||
null_offset_.push_back(offset);
|
||||
// still need to add null to make offset is correct
|
||||
std::string empty = "";
|
||||
wrapper_->add_array_data(&empty, 0, offset);
|
||||
}
|
||||
|
||||
// Add texts for growing segment
|
||||
void
|
||||
TextMatchIndex::AddTexts(size_t n,
|
||||
const std::string* texts,
|
||||
const bool* valids,
|
||||
int64_t offset_begin) {
|
||||
TextMatchIndex::AddTextsGrowing(size_t n,
|
||||
const std::string* texts,
|
||||
const bool* valids,
|
||||
int64_t offset_begin) {
|
||||
if (valids != nullptr) {
|
||||
for (int i = 0; i < n; i++) {
|
||||
auto offset = i + offset_begin;
|
||||
|
||||
@ -50,13 +50,13 @@ class TextMatchIndex : public InvertedIndexTantivy<std::string> {
|
||||
|
||||
public:
|
||||
void
|
||||
AddText(const std::string& text, const bool valid, int64_t offset);
|
||||
AddTextSealed(const std::string& text, const bool valid, int64_t offset);
|
||||
|
||||
void
|
||||
AddNull(int64_t offset);
|
||||
AddNullSealed(int64_t offset);
|
||||
|
||||
void
|
||||
AddTexts(size_t n,
|
||||
AddTextsGrowing(size_t n,
|
||||
const std::string* texts,
|
||||
const bool* valids,
|
||||
int64_t offset_begin);
|
||||
|
||||
@ -1191,7 +1191,7 @@ ChunkedSegmentSealedImpl::CreateTextIndex(FieldId field_id) {
|
||||
if (iter != fields_.end()) {
|
||||
iter->second->BulkRawStringAt(
|
||||
[&](std::string_view value, size_t offset, bool is_valid) {
|
||||
index->AddText(std::string(value), is_valid, offset);
|
||||
index->AddTextSealed(std::string(value), is_valid, offset);
|
||||
});
|
||||
} else { // fetch raw data from index.
|
||||
auto field_index_iter = scalar_indexings_.find(field_id);
|
||||
@ -1212,9 +1212,9 @@ ChunkedSegmentSealedImpl::CreateTextIndex(FieldId field_id) {
|
||||
for (size_t i = 0; i < n; i++) {
|
||||
auto raw = impl->Reverse_Lookup(i);
|
||||
if (!raw.has_value()) {
|
||||
index->AddNull(i);
|
||||
index->AddNullSealed(i);
|
||||
}
|
||||
index->AddText(raw.value(), true, i);
|
||||
index->AddTextSealed(raw.value(), true, i);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1167,7 +1167,7 @@ SegmentGrowingImpl::AddTexts(milvus::FieldId field_id,
|
||||
ErrorCode::TextIndexNotFound,
|
||||
fmt::format("text index not found for field {}", field_id.get()));
|
||||
}
|
||||
iter->second->AddTexts(n, texts, texts_valid_data, offset_begin);
|
||||
iter->second->AddTextsGrowing(n, texts, texts_valid_data, offset_begin);
|
||||
}
|
||||
|
||||
void
|
||||
|
||||
@ -156,9 +156,9 @@ TEST(TextMatch, Index) {
|
||||
"milvus_tokenizer",
|
||||
"{}");
|
||||
index->CreateReader(milvus::index::SetBitsetSealed);
|
||||
index->AddText("football, basketball, pingpang", true, 0);
|
||||
index->AddText("", false, 1);
|
||||
index->AddText("swimming, football", true, 2);
|
||||
index->AddTextSealed("football, basketball, pingpang", true, 0);
|
||||
index->AddTextSealed("", false, 1);
|
||||
index->AddTextSealed("swimming, football", true, 2);
|
||||
index->Commit();
|
||||
index->Reload();
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user