mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
issue: https://github.com/milvus-io/milvus/issues/40308 cherry-pick: https://github.com/milvus-io/milvus/pull/40363 --------- Signed-off-by: SpadeA <tangchenjie1210@gmail.com>
This commit is contained in:
parent
a0f18e8c88
commit
345ca01634
@ -123,10 +123,13 @@ InvertedIndexTantivy<T>::finish() {
|
||||
template <typename T>
|
||||
BinarySet
|
||||
InvertedIndexTantivy<T>::Serialize(const Config& config) {
|
||||
auto index_valid_data_length = null_offset.size() * sizeof(size_t);
|
||||
folly::SharedMutex::ReadHolder lock(mutex_);
|
||||
auto index_valid_data_length = null_offset_.size() * sizeof(size_t);
|
||||
std::shared_ptr<uint8_t[]> index_valid_data(
|
||||
new uint8_t[index_valid_data_length]);
|
||||
memcpy(index_valid_data.get(), null_offset.data(), index_valid_data_length);
|
||||
memcpy(
|
||||
index_valid_data.get(), null_offset_.data(), index_valid_data_length);
|
||||
lock.unlock();
|
||||
BinarySet res_set;
|
||||
if (index_valid_data_length > 0) {
|
||||
res_set.Append(
|
||||
@ -229,8 +232,9 @@ InvertedIndexTantivy<T>::Load(milvus::tracer::TraceContext ctx,
|
||||
binary_set.Append(key, buf, size);
|
||||
}
|
||||
auto index_valid_data = binary_set.GetByName("index_null_offset");
|
||||
null_offset.resize((size_t)index_valid_data->size / sizeof(size_t));
|
||||
memcpy(null_offset.data(),
|
||||
folly::SharedMutex::WriteHolder lock(mutex_);
|
||||
null_offset_.resize((size_t)index_valid_data->size / sizeof(size_t));
|
||||
memcpy(null_offset_.data(),
|
||||
index_valid_data->data.get(),
|
||||
(size_t)index_valid_data->size);
|
||||
}
|
||||
@ -253,10 +257,13 @@ InvertedIndexTantivy<T>::In(size_t n, const T* values) {
|
||||
template <typename T>
|
||||
const TargetBitmap
|
||||
InvertedIndexTantivy<T>::IsNull() {
|
||||
TargetBitmap bitset(Count());
|
||||
|
||||
for (size_t i = 0; i < null_offset.size(); ++i) {
|
||||
bitset.set(null_offset[i]);
|
||||
int64_t count = Count();
|
||||
TargetBitmap bitset(count);
|
||||
folly::SharedMutex::ReadHolder lock(mutex_);
|
||||
auto end =
|
||||
std::lower_bound(null_offset_.begin(), null_offset_.end(), count);
|
||||
for (auto iter = null_offset_.begin(); iter != end; ++iter) {
|
||||
bitset.set(*iter);
|
||||
}
|
||||
return bitset;
|
||||
}
|
||||
@ -264,9 +271,13 @@ InvertedIndexTantivy<T>::IsNull() {
|
||||
template <typename T>
|
||||
const TargetBitmap
|
||||
InvertedIndexTantivy<T>::IsNotNull() {
|
||||
TargetBitmap bitset(Count(), true);
|
||||
for (size_t i = 0; i < null_offset.size(); ++i) {
|
||||
bitset.reset(null_offset[i]);
|
||||
int64_t count = Count();
|
||||
TargetBitmap bitset(count, true);
|
||||
folly::SharedMutex::ReadHolder lock(mutex_);
|
||||
auto end =
|
||||
std::lower_bound(null_offset_.begin(), null_offset_.end(), count);
|
||||
for (auto iter = null_offset_.begin(); iter != end; ++iter) {
|
||||
bitset.reset(*iter);
|
||||
}
|
||||
return bitset;
|
||||
}
|
||||
@ -296,13 +307,18 @@ InvertedIndexTantivy<T>::InApplyCallback(
|
||||
template <typename T>
|
||||
const TargetBitmap
|
||||
InvertedIndexTantivy<T>::NotIn(size_t n, const T* values) {
|
||||
TargetBitmap bitset(Count(), true);
|
||||
int64_t count = Count();
|
||||
TargetBitmap bitset(count, true);
|
||||
for (size_t i = 0; i < n; ++i) {
|
||||
auto array = wrapper_->term_query(values[i]);
|
||||
apply_hits(bitset, array, false);
|
||||
}
|
||||
for (size_t i = 0; i < null_offset.size(); ++i) {
|
||||
bitset.reset(null_offset[i]);
|
||||
|
||||
folly::SharedMutex::ReadHolder lock(mutex_);
|
||||
auto end =
|
||||
std::lower_bound(null_offset_.begin(), null_offset_.end(), count);
|
||||
for (auto iter = null_offset_.begin(); iter != end; ++iter) {
|
||||
bitset.reset(*iter);
|
||||
}
|
||||
return bitset;
|
||||
}
|
||||
@ -466,7 +482,8 @@ InvertedIndexTantivy<T>::BuildWithFieldData(
|
||||
for (const auto& data : field_datas) {
|
||||
total += data->get_null_count();
|
||||
}
|
||||
null_offset.reserve(total);
|
||||
folly::SharedMutex::WriteHolder lock(mutex_);
|
||||
null_offset_.reserve(total);
|
||||
}
|
||||
switch (schema_.data_type()) {
|
||||
case proto::schema::DataType::Bool:
|
||||
@ -487,7 +504,8 @@ InvertedIndexTantivy<T>::BuildWithFieldData(
|
||||
auto n = data->get_num_rows();
|
||||
for (int i = 0; i < n; i++) {
|
||||
if (!data->is_valid(i)) {
|
||||
null_offset.push_back(i);
|
||||
folly::SharedMutex::WriteHolder lock(mutex_);
|
||||
null_offset_.push_back(i);
|
||||
}
|
||||
wrapper_->add_multi_data<T>(
|
||||
static_cast<const T*>(data->RawValue(i)),
|
||||
@ -509,7 +527,8 @@ InvertedIndexTantivy<T>::BuildWithFieldData(
|
||||
if (schema_.nullable()) {
|
||||
for (int i = 0; i < n; i++) {
|
||||
if (!data->is_valid(i)) {
|
||||
null_offset.push_back(i);
|
||||
folly::SharedMutex::WriteHolder lock(mutex_);
|
||||
null_offset_.push_back(i);
|
||||
}
|
||||
wrapper_
|
||||
->add_multi_data_by_single_segment_writer<T>(
|
||||
@ -547,7 +566,8 @@ InvertedIndexTantivy<T>::build_index_for_array(
|
||||
auto array_column = static_cast<const Array*>(data->Data());
|
||||
for (int64_t i = 0; i < n; i++) {
|
||||
if (schema_.nullable() && !data->is_valid(i)) {
|
||||
null_offset.push_back(i);
|
||||
folly::SharedMutex::WriteHolder lock(mutex_);
|
||||
null_offset_.push_back(i);
|
||||
}
|
||||
auto length = data->is_valid(i) ? array_column[i].length() : 0;
|
||||
if (!inverted_index_single_segment_) {
|
||||
@ -576,7 +596,8 @@ InvertedIndexTantivy<std::string>::build_index_for_array(
|
||||
Assert(IsStringDataType(
|
||||
static_cast<DataType>(schema_.element_type())));
|
||||
if (schema_.nullable() && !data->is_valid(i)) {
|
||||
null_offset.push_back(i);
|
||||
folly::SharedMutex::WriteHolder lock(mutex_);
|
||||
null_offset_.push_back(i);
|
||||
}
|
||||
std::vector<std::string> output;
|
||||
for (int64_t j = 0; j < array_column[i].length(); j++) {
|
||||
|
||||
@ -13,6 +13,7 @@
|
||||
|
||||
#include <cstddef>
|
||||
#include <vector>
|
||||
#include <folly/SharedMutex.h>
|
||||
#include "common/RegexQuery.h"
|
||||
#include "index/Index.h"
|
||||
#include "storage/FileManager.h"
|
||||
@ -203,9 +204,10 @@ class InvertedIndexTantivy : public ScalarIndex<T> {
|
||||
MemFileManagerPtr mem_file_manager_;
|
||||
DiskFileManagerPtr disk_file_manager_;
|
||||
|
||||
folly::SharedMutexWritePriority mutex_{};
|
||||
// all data need to be built to align the offset
|
||||
// so need to store null_offset in inverted index additionally
|
||||
std::vector<size_t> null_offset{};
|
||||
// so need to store null_offset_ in inverted index additionally
|
||||
std::vector<size_t> null_offset_{};
|
||||
|
||||
// `inverted_index_single_segment_` is used to control whether to build tantivy index with single segment.
|
||||
//
|
||||
|
||||
@ -147,8 +147,9 @@ TextMatchIndex::Load(const Config& config) {
|
||||
binary_set.Append(key, buf, size);
|
||||
}
|
||||
auto index_valid_data = binary_set.GetByName("index_null_offset");
|
||||
null_offset.resize((size_t)index_valid_data->size / sizeof(size_t));
|
||||
memcpy(null_offset.data(),
|
||||
folly::SharedMutex::WriteHolder lock(mutex_);
|
||||
null_offset_.resize((size_t)index_valid_data->size / sizeof(size_t));
|
||||
memcpy(null_offset_.data(),
|
||||
index_valid_data->data.get(),
|
||||
(size_t)index_valid_data->size);
|
||||
}
|
||||
@ -177,7 +178,10 @@ TextMatchIndex::AddText(const std::string& text,
|
||||
|
||||
void
|
||||
TextMatchIndex::AddNull(int64_t offset) {
|
||||
null_offset.push_back(offset);
|
||||
{
|
||||
folly::SharedMutex::WriteHolder lock(mutex_);
|
||||
null_offset_.push_back(offset);
|
||||
}
|
||||
// still need to add null to make offset is correct
|
||||
std::string empty = "";
|
||||
wrapper_->add_multi_data(&empty, 0, offset);
|
||||
@ -192,7 +196,8 @@ TextMatchIndex::AddTexts(size_t n,
|
||||
for (int i = 0; i < n; i++) {
|
||||
auto offset = i + offset_begin;
|
||||
if (!valids[i]) {
|
||||
null_offset.push_back(offset);
|
||||
folly::SharedMutex::WriteHolder lock(mutex_);
|
||||
null_offset_.push_back(offset);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -212,12 +217,16 @@ TextMatchIndex::BuildIndexFromFieldData(
|
||||
for (const auto& data : field_datas) {
|
||||
total += data->get_null_count();
|
||||
}
|
||||
null_offset.reserve(total);
|
||||
{
|
||||
folly::SharedMutex::WriteHolder lock(mutex_);
|
||||
null_offset_.reserve(total);
|
||||
}
|
||||
for (const auto& data : field_datas) {
|
||||
auto n = data->get_num_rows();
|
||||
for (int i = 0; i < n; i++) {
|
||||
if (!data->is_valid(i)) {
|
||||
null_offset.push_back(i);
|
||||
folly::SharedMutex::WriteHolder lock(mutex_);
|
||||
null_offset_.push_back(i);
|
||||
}
|
||||
wrapper_->add_data(
|
||||
static_cast<const std::string*>(data->RawValue(i)),
|
||||
|
||||
@ -813,4 +813,63 @@ TEST(TextMatch, GrowingLoadData) {
|
||||
ASSERT_FALSE(final[4]);
|
||||
ASSERT_TRUE(final[5]);
|
||||
ASSERT_FALSE(final[6]);
|
||||
}
|
||||
|
||||
TEST(TextMatch, ConcurrentReadWriteWithNull) {
|
||||
auto schema = GenTestSchema({}, true);
|
||||
auto seg = CreateGrowingSegment(schema, empty_index_meta);
|
||||
int64_t N = 1000;
|
||||
uint64_t seed = 19190504;
|
||||
auto raw_data = DataGen(schema, N, seed);
|
||||
auto str_col_valid =
|
||||
raw_data.raw_->mutable_fields_data()->at(1).mutable_valid_data();
|
||||
auto str_col = raw_data.raw_->mutable_fields_data()
|
||||
->at(1)
|
||||
.mutable_scalars()
|
||||
->mutable_string_data()
|
||||
->mutable_data();
|
||||
for (int64_t i = 0; i < N - 1; i++) {
|
||||
str_col->at(i) = "";
|
||||
}
|
||||
str_col->at(N - 1) = "football";
|
||||
for (int64_t i = 0; i < N - 1; i++) {
|
||||
str_col_valid->at(i) = false;
|
||||
}
|
||||
|
||||
std::thread writer([&seg, &raw_data, N]() {
|
||||
seg->PreInsert(N);
|
||||
seg->Insert(0,
|
||||
N,
|
||||
raw_data.row_ids_.data(),
|
||||
raw_data.timestamps_.data(),
|
||||
raw_data.raw_);
|
||||
});
|
||||
|
||||
std::thread reader([&seg, N]() {
|
||||
auto start = std::chrono::high_resolution_clock::now();
|
||||
;
|
||||
const std::chrono::seconds timeout_duration{2};
|
||||
while (true) {
|
||||
if (start - std::chrono::high_resolution_clock::now() >
|
||||
timeout_duration) {
|
||||
ASSERT_TRUE(false)
|
||||
<< "Failed to get valid results within timeout";
|
||||
break;
|
||||
}
|
||||
BitsetType final;
|
||||
auto expr = GetTextMatchExpr(GenTestSchema(), "football");
|
||||
final = ExecuteQueryExpr(expr, seg.get(), N, MAX_TIMESTAMP);
|
||||
if (final.size() != N || !final[N - 1]) {
|
||||
continue;
|
||||
}
|
||||
for (int64_t i = 0; i < N - 1; i++) {
|
||||
ASSERT_FALSE(final[i]);
|
||||
}
|
||||
ASSERT_TRUE(final[N - 1]);
|
||||
break;
|
||||
}
|
||||
});
|
||||
|
||||
writer.join();
|
||||
reader.join();
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user