mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
enhance: Add bulk api for json data (#42407)
issue: https://github.com/milvus-io/milvus/issues/42409 --------- Signed-off-by: sunby <sunbingyi1992@gmail.com>
This commit is contained in:
parent
e99d2fc63e
commit
6c16d3dbee
@ -645,82 +645,104 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForJsonForIndex() {
|
||||
val1,
|
||||
val2,
|
||||
lower_inclusive,
|
||||
upper_inclusive](bool valid,
|
||||
uint8_t type,
|
||||
uint32_t row_id,
|
||||
uint16_t offset,
|
||||
uint16_t size,
|
||||
int32_t value) {
|
||||
if (valid) {
|
||||
if constexpr (std::is_same_v<GetType, int64_t>) {
|
||||
if (type != uint8_t(milvus::index::JSONType::INT32) &&
|
||||
type != uint8_t(milvus::index::JSONType::INT64) &&
|
||||
type != uint8_t(milvus::index::JSONType::FLOAT) &&
|
||||
type != uint8_t(milvus::index::JSONType::DOUBLE)) {
|
||||
return false;
|
||||
}
|
||||
} else if constexpr (std::is_same_v<GetType,
|
||||
std::string_view>) {
|
||||
if (type != uint8_t(milvus::index::JSONType::STRING) &&
|
||||
type !=
|
||||
uint8_t(milvus::index::JSONType::STRING_ESCAPE)) {
|
||||
return false;
|
||||
}
|
||||
} else if constexpr (std::is_same_v<GetType, double>) {
|
||||
if (type != uint8_t(milvus::index::JSONType::INT32) &&
|
||||
type != uint8_t(milvus::index::JSONType::INT64) &&
|
||||
type != uint8_t(milvus::index::JSONType::FLOAT) &&
|
||||
type != uint8_t(milvus::index::JSONType::DOUBLE)) {
|
||||
return false;
|
||||
}
|
||||
} else if constexpr (std::is_same_v<GetType, bool>) {
|
||||
if (type != uint8_t(milvus::index::JSONType::BOOL)) {
|
||||
return false;
|
||||
}
|
||||
upper_inclusive](const bool* valid_array,
|
||||
const uint8_t* type_array,
|
||||
const uint32_t* row_id_array,
|
||||
const uint16_t* offset_array,
|
||||
const uint16_t* size_array,
|
||||
const int32_t* value_array,
|
||||
TargetBitmap& bitset,
|
||||
const size_t n) {
|
||||
std::vector<int64_t> invalid_row_ids;
|
||||
for (size_t i = 0; i < n; i++) {
|
||||
auto valid = valid_array[i];
|
||||
auto type = type_array[i];
|
||||
auto row_id = row_id_array[i];
|
||||
auto offset = offset_array[i];
|
||||
auto size = size_array[i];
|
||||
auto value = value_array[i];
|
||||
if (!valid) {
|
||||
invalid_row_ids.push_back(row_id_array[i]);
|
||||
continue;
|
||||
}
|
||||
if (lower_inclusive && upper_inclusive) {
|
||||
if (type == uint8_t(milvus::index::JSONType::FLOAT)) {
|
||||
BinaryRangeJSONTypeCompareWithValue(
|
||||
static_cast<float>(val1) <= val &&
|
||||
val <= static_cast<float>(val2));
|
||||
} else {
|
||||
BinaryRangeJSONTypeCompareWithValue(val1 <= val &&
|
||||
val <= val2);
|
||||
|
||||
auto f = [&]() {
|
||||
if constexpr (std::is_same_v<GetType, int64_t>) {
|
||||
if (type != uint8_t(milvus::index::JSONType::INT32) &&
|
||||
type != uint8_t(milvus::index::JSONType::INT64) &&
|
||||
type != uint8_t(milvus::index::JSONType::FLOAT) &&
|
||||
type != uint8_t(milvus::index::JSONType::DOUBLE)) {
|
||||
return false;
|
||||
}
|
||||
} else if constexpr (std::is_same_v<GetType,
|
||||
std::string_view>) {
|
||||
if (type != uint8_t(milvus::index::JSONType::STRING) &&
|
||||
type !=
|
||||
uint8_t(
|
||||
milvus::index::JSONType::STRING_ESCAPE)) {
|
||||
return false;
|
||||
}
|
||||
} else if constexpr (std::is_same_v<GetType, double>) {
|
||||
if (type != uint8_t(milvus::index::JSONType::INT32) &&
|
||||
type != uint8_t(milvus::index::JSONType::INT64) &&
|
||||
type != uint8_t(milvus::index::JSONType::FLOAT) &&
|
||||
type != uint8_t(milvus::index::JSONType::DOUBLE)) {
|
||||
return false;
|
||||
}
|
||||
} else if constexpr (std::is_same_v<GetType, bool>) {
|
||||
if (type != uint8_t(milvus::index::JSONType::BOOL)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
} else if (lower_inclusive && !upper_inclusive) {
|
||||
if (type == uint8_t(milvus::index::JSONType::FLOAT)) {
|
||||
BinaryRangeJSONTypeCompareWithValue(
|
||||
static_cast<float>(val1) <= val &&
|
||||
val < static_cast<float>(val2));
|
||||
if (lower_inclusive && upper_inclusive) {
|
||||
if (type == uint8_t(milvus::index::JSONType::FLOAT)) {
|
||||
BinaryRangeJSONTypeCompareWithValue(
|
||||
static_cast<float>(val1) <= val &&
|
||||
val <= static_cast<float>(val2));
|
||||
} else {
|
||||
BinaryRangeJSONTypeCompareWithValue(val1 <= val &&
|
||||
val <= val2);
|
||||
}
|
||||
} else if (lower_inclusive && !upper_inclusive) {
|
||||
if (type == uint8_t(milvus::index::JSONType::FLOAT)) {
|
||||
BinaryRangeJSONTypeCompareWithValue(
|
||||
static_cast<float>(val1) <= val &&
|
||||
val < static_cast<float>(val2));
|
||||
} else {
|
||||
BinaryRangeJSONTypeCompareWithValue(val1 <= val &&
|
||||
val < val2);
|
||||
}
|
||||
} else if (!lower_inclusive && upper_inclusive) {
|
||||
if (type == uint8_t(milvus::index::JSONType::FLOAT)) {
|
||||
BinaryRangeJSONTypeCompareWithValue(
|
||||
static_cast<float>(val1) < val &&
|
||||
val <= static_cast<float>(val2));
|
||||
} else {
|
||||
BinaryRangeJSONTypeCompareWithValue(val1 < val &&
|
||||
val <= val2);
|
||||
}
|
||||
} else {
|
||||
BinaryRangeJSONTypeCompareWithValue(val1 <= val &&
|
||||
val < val2);
|
||||
if (type == uint8_t(milvus::index::JSONType::FLOAT)) {
|
||||
BinaryRangeJSONTypeCompareWithValue(
|
||||
static_cast<float>(val1) < val &&
|
||||
val < static_cast<float>(val2));
|
||||
} else {
|
||||
BinaryRangeJSONTypeCompareWithValue(val1 < val &&
|
||||
val < val2);
|
||||
}
|
||||
}
|
||||
} else if (!lower_inclusive && upper_inclusive) {
|
||||
if (type == uint8_t(milvus::index::JSONType::FLOAT)) {
|
||||
BinaryRangeJSONTypeCompareWithValue(
|
||||
static_cast<float>(val1) < val &&
|
||||
val <= static_cast<float>(val2));
|
||||
} else {
|
||||
BinaryRangeJSONTypeCompareWithValue(val1 < val &&
|
||||
val <= val2);
|
||||
}
|
||||
} else {
|
||||
if (type == uint8_t(milvus::index::JSONType::FLOAT)) {
|
||||
BinaryRangeJSONTypeCompareWithValue(
|
||||
static_cast<float>(val1) < val &&
|
||||
val < static_cast<float>(val2));
|
||||
} else {
|
||||
BinaryRangeJSONTypeCompareWithValue(val1 < val &&
|
||||
val < val2);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
auto json_pair = segment->GetJsonData(field_id, row_id);
|
||||
if (!json_pair.second) {
|
||||
};
|
||||
bitset[row_id] = f();
|
||||
}
|
||||
|
||||
auto f = [&](const milvus::Json& json,
|
||||
uint8_t type,
|
||||
uint16_t offset,
|
||||
uint16_t size,
|
||||
bool is_valid) {
|
||||
if (!is_valid) {
|
||||
return false;
|
||||
}
|
||||
auto& json = json_pair.first;
|
||||
if (lower_inclusive && upper_inclusive) {
|
||||
if (type == uint8_t(milvus::index::JSONType::STRING) ||
|
||||
type == uint8_t(milvus::index::JSONType::DOUBLE) ||
|
||||
@ -762,7 +784,18 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForJsonForIndex() {
|
||||
ValueType(val.value()) < val2);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
segment->BulkGetJsonData(
|
||||
field_id,
|
||||
[&](const milvus::Json& json, size_t i, bool is_valid) {
|
||||
auto type = type_array[i];
|
||||
auto row_id = invalid_row_ids[i];
|
||||
auto offset = offset_array[i];
|
||||
auto size = size_array[i];
|
||||
bitset[row_id] = f(json, type, offset, size, is_valid);
|
||||
},
|
||||
invalid_row_ids.data(),
|
||||
invalid_row_ids.size());
|
||||
};
|
||||
bool is_growing = segment_->type() == SegmentType::Growing;
|
||||
bool is_strong_consistency = consistency_level_ == 0;
|
||||
|
||||
@ -192,13 +192,19 @@ PhyExistsFilterExpr::EvalJsonExistsForDataSegmentForIndex() {
|
||||
auto field_id = expr_->column_.field_id_;
|
||||
auto* index = segment->GetJsonKeyIndex(field_id);
|
||||
Assert(index != nullptr);
|
||||
auto filter_func = [segment, field_id, pointer](bool valid,
|
||||
uint8_t type,
|
||||
uint32_t row_id,
|
||||
uint16_t offset,
|
||||
uint16_t size,
|
||||
uint32_t value) {
|
||||
return true;
|
||||
auto filter_func = [segment, field_id, pointer](
|
||||
const bool* valid_array,
|
||||
const uint8_t* type_array,
|
||||
const uint32_t* row_id_array,
|
||||
const uint16_t* offset_array,
|
||||
const uint16_t* size_array,
|
||||
const int32_t* value_array,
|
||||
TargetBitmap& bitset,
|
||||
const size_t n) {
|
||||
for (size_t i = 0; i < n; i++) {
|
||||
auto row_id = row_id_array[i];
|
||||
bitset[row_id] = true;
|
||||
}
|
||||
};
|
||||
bool is_growing = segment_->type() == SegmentType::Growing;
|
||||
bool is_strong_consistency = consistency_level_ == 0;
|
||||
|
||||
@ -415,22 +415,37 @@ PhyJsonContainsFilterExpr::ExecJsonContainsByKeyIndex() {
|
||||
auto field_id = expr_->column_.field_id_;
|
||||
auto* index = segment->GetJsonKeyIndex(field_id);
|
||||
Assert(index != nullptr);
|
||||
auto filter_func = [this, segment, &field_id](bool valid,
|
||||
uint8_t type,
|
||||
uint32_t row_id,
|
||||
uint16_t offset,
|
||||
uint16_t size,
|
||||
int32_t value) {
|
||||
if (valid) {
|
||||
return false;
|
||||
} else {
|
||||
auto json_pair = segment->GetJsonData(field_id, row_id);
|
||||
if (!json_pair.second) {
|
||||
auto filter_func = [this, segment, &field_id](
|
||||
const bool* valid_array,
|
||||
const uint8_t* type_array,
|
||||
const uint32_t* row_id_array,
|
||||
const uint16_t* offset_array,
|
||||
const uint16_t* size_array,
|
||||
const int32_t* value_array,
|
||||
TargetBitmap& bitset,
|
||||
const size_t n) {
|
||||
std::vector<int64_t> invalid_row_ids;
|
||||
for (size_t i = 0; i < n; i++) {
|
||||
auto valid = valid_array[i];
|
||||
auto type = type_array[i];
|
||||
auto row_id = row_id_array[i];
|
||||
auto offset = offset_array[i];
|
||||
auto size = size_array[i];
|
||||
auto value = value_array[i];
|
||||
if (valid) {
|
||||
bitset[row_id] = false;
|
||||
} else {
|
||||
invalid_row_ids.push_back(row_id_array[i]);
|
||||
}
|
||||
}
|
||||
auto f = [&](const milvus::Json& json,
|
||||
uint16_t offset,
|
||||
uint16_t size,
|
||||
bool is_valid) {
|
||||
if (!is_valid) {
|
||||
return false;
|
||||
}
|
||||
auto& json = json_pair.first;
|
||||
auto array = json.array_at(offset, size);
|
||||
|
||||
if (array.error()) {
|
||||
return false;
|
||||
}
|
||||
@ -444,7 +459,18 @@ PhyJsonContainsFilterExpr::ExecJsonContainsByKeyIndex() {
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
};
|
||||
segment->BulkGetJsonData(
|
||||
field_id,
|
||||
[&](const milvus::Json& json, size_t i, bool is_valid) {
|
||||
auto type = type_array[i];
|
||||
auto row_id = invalid_row_ids[i];
|
||||
auto offset = offset_array[i];
|
||||
auto size = size_array[i];
|
||||
bitset[row_id] = f(json, offset, size, is_valid);
|
||||
},
|
||||
invalid_row_ids.data(),
|
||||
invalid_row_ids.size());
|
||||
};
|
||||
bool is_growing = segment_->type() == SegmentType::Growing;
|
||||
bool is_strong_consistency = consistency_level_ == 0;
|
||||
@ -599,20 +625,36 @@ PhyJsonContainsFilterExpr::ExecJsonContainsArrayByKeyIndex() {
|
||||
auto field_id = expr_->column_.field_id_;
|
||||
auto* index = segment->GetJsonKeyIndex(field_id);
|
||||
Assert(index != nullptr);
|
||||
auto filter_func = [segment, &elements, &field_id](bool valid,
|
||||
uint8_t type,
|
||||
uint32_t row_id,
|
||||
uint16_t offset,
|
||||
uint16_t size,
|
||||
int32_t value) {
|
||||
if (valid) {
|
||||
return false;
|
||||
} else {
|
||||
auto json_pair = segment->GetJsonData(field_id, row_id);
|
||||
if (!json_pair.second) {
|
||||
auto filter_func = [segment, &elements, &field_id](
|
||||
const bool* valid_array,
|
||||
const uint8_t* type_array,
|
||||
const uint32_t* row_id_array,
|
||||
const uint16_t* offset_array,
|
||||
const uint16_t* size_array,
|
||||
const int32_t* value_array,
|
||||
TargetBitmap& bitset,
|
||||
const size_t n) {
|
||||
std::vector<int64_t> invalid_row_ids;
|
||||
for (size_t i = 0; i < n; i++) {
|
||||
auto valid = valid_array[i];
|
||||
auto type = type_array[i];
|
||||
auto row_id = row_id_array[i];
|
||||
auto offset = offset_array[i];
|
||||
auto size = size_array[i];
|
||||
auto value = value_array[i];
|
||||
if (valid) {
|
||||
bitset[row_id] = false;
|
||||
} else {
|
||||
invalid_row_ids.push_back(row_id_array[i]);
|
||||
}
|
||||
}
|
||||
auto f = [&](const milvus::Json& json,
|
||||
uint16_t offset,
|
||||
uint16_t size,
|
||||
bool is_valid) {
|
||||
if (!is_valid) {
|
||||
return false;
|
||||
}
|
||||
auto& json = json_pair.first;
|
||||
auto array = json.array_at(offset, size);
|
||||
if (array.error()) {
|
||||
return false;
|
||||
@ -629,7 +671,18 @@ PhyJsonContainsFilterExpr::ExecJsonContainsArrayByKeyIndex() {
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
};
|
||||
segment->BulkGetJsonData(
|
||||
field_id,
|
||||
[&](const milvus::Json& json, size_t i, bool is_valid) {
|
||||
auto type = type_array[i];
|
||||
auto row_id = invalid_row_ids[i];
|
||||
auto offset = offset_array[i];
|
||||
auto size = size_array[i];
|
||||
bitset[row_id] = f(json, offset, size, is_valid);
|
||||
},
|
||||
invalid_row_ids.data(),
|
||||
invalid_row_ids.size());
|
||||
};
|
||||
bool is_growing = segment_->type() == SegmentType::Growing;
|
||||
bool is_strong_consistency = consistency_level_ == 0;
|
||||
@ -877,20 +930,36 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllByKeyIndex() {
|
||||
auto field_id = expr_->column_.field_id_;
|
||||
auto* index = segment->GetJsonKeyIndex(field_id);
|
||||
Assert(index != nullptr);
|
||||
auto filter_func = [segment, &elements, &field_id](bool valid,
|
||||
uint8_t type,
|
||||
uint32_t row_id,
|
||||
uint16_t offset,
|
||||
uint16_t size,
|
||||
int32_t value) {
|
||||
if (valid) {
|
||||
return false;
|
||||
} else {
|
||||
auto json_pair = segment->GetJsonData(field_id, row_id);
|
||||
if (!json_pair.second) {
|
||||
auto filter_func = [segment, &elements, &field_id](
|
||||
const bool* valid_array,
|
||||
const uint8_t* type_array,
|
||||
const uint32_t* row_id_array,
|
||||
const uint16_t* offset_array,
|
||||
const uint16_t* size_array,
|
||||
const int32_t* value_array,
|
||||
TargetBitmap& bitset,
|
||||
const size_t n) {
|
||||
std::vector<int64_t> invalid_row_ids;
|
||||
for (size_t i = 0; i < n; i++) {
|
||||
auto valid = valid_array[i];
|
||||
auto type = type_array[i];
|
||||
auto row_id = row_id_array[i];
|
||||
auto offset = offset_array[i];
|
||||
auto size = size_array[i];
|
||||
auto value = value_array[i];
|
||||
if (valid) {
|
||||
bitset[row_id] = false;
|
||||
} else {
|
||||
invalid_row_ids.push_back(row_id_array[i]);
|
||||
}
|
||||
}
|
||||
auto f = [&](const milvus::Json& json,
|
||||
uint16_t offset,
|
||||
uint16_t size,
|
||||
bool is_valid) {
|
||||
if (!is_valid) {
|
||||
return false;
|
||||
}
|
||||
auto& json = json_pair.first;
|
||||
auto array = json.array_at(offset, size);
|
||||
if (array.error()) {
|
||||
return false;
|
||||
@ -907,7 +976,18 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllByKeyIndex() {
|
||||
}
|
||||
}
|
||||
return tmp_elements.empty();
|
||||
}
|
||||
};
|
||||
segment->BulkGetJsonData(
|
||||
field_id,
|
||||
[&](const milvus::Json& json, size_t i, bool is_valid) {
|
||||
auto type = type_array[i];
|
||||
auto row_id = invalid_row_ids[i];
|
||||
auto offset = offset_array[i];
|
||||
auto size = size_array[i];
|
||||
bitset[row_id] = f(json, offset, size, is_valid);
|
||||
},
|
||||
invalid_row_ids.data(),
|
||||
invalid_row_ids.size());
|
||||
};
|
||||
bool is_growing = segment_->type() == SegmentType::Growing;
|
||||
bool is_strong_consistency = consistency_level_ == 0;
|
||||
@ -1125,20 +1205,35 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllWithDiffTypeByKeyIndex() {
|
||||
auto* index = segment->GetJsonKeyIndex(field_id);
|
||||
Assert(index != nullptr);
|
||||
auto filter_func = [segment, &elements, &elements_index, &field_id](
|
||||
bool valid,
|
||||
uint8_t type,
|
||||
uint32_t row_id,
|
||||
uint16_t offset,
|
||||
uint16_t size,
|
||||
int32_t value) {
|
||||
if (valid) {
|
||||
return false;
|
||||
} else {
|
||||
auto json_pair = segment->GetJsonData(field_id, row_id);
|
||||
if (!json_pair.second) {
|
||||
const bool* valid_array,
|
||||
const uint8_t* type_array,
|
||||
const uint32_t* row_id_array,
|
||||
const uint16_t* offset_array,
|
||||
const uint16_t* size_array,
|
||||
const int32_t* value_array,
|
||||
TargetBitmap& bitset,
|
||||
const size_t n) {
|
||||
std::vector<int64_t> invalid_row_ids;
|
||||
for (size_t i = 0; i < n; i++) {
|
||||
auto valid = valid_array[i];
|
||||
auto type = type_array[i];
|
||||
auto row_id = row_id_array[i];
|
||||
auto offset = offset_array[i];
|
||||
auto size = size_array[i];
|
||||
auto value = value_array[i];
|
||||
if (valid) {
|
||||
bitset[row_id] = false;
|
||||
} else {
|
||||
invalid_row_ids.push_back(row_id_array[i]);
|
||||
}
|
||||
}
|
||||
auto f = [&](const milvus::Json& json,
|
||||
uint16_t offset,
|
||||
uint16_t size,
|
||||
bool is_valid) {
|
||||
if (!is_valid) {
|
||||
return false;
|
||||
}
|
||||
auto& json = json_pair.first;
|
||||
std::set<int> tmp_elements_index(elements_index);
|
||||
auto array = json.array_at(offset, size);
|
||||
if (array.error()) {
|
||||
@ -1215,7 +1310,18 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllWithDiffTypeByKeyIndex() {
|
||||
}
|
||||
}
|
||||
return tmp_elements_index.size() == 0;
|
||||
}
|
||||
};
|
||||
segment->BulkGetJsonData(
|
||||
field_id,
|
||||
[&](const milvus::Json& json, size_t i, bool is_valid) {
|
||||
auto type = type_array[i];
|
||||
auto row_id = invalid_row_ids[i];
|
||||
auto offset = offset_array[i];
|
||||
auto size = size_array[i];
|
||||
bitset[row_id] = f(json, offset, size, is_valid);
|
||||
},
|
||||
invalid_row_ids.data(),
|
||||
invalid_row_ids.size());
|
||||
};
|
||||
bool is_growing = segment_->type() == SegmentType::Growing;
|
||||
bool is_strong_consistency = consistency_level_ == 0;
|
||||
@ -1376,20 +1482,36 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllArrayByKeyIndex() {
|
||||
auto field_id = expr_->column_.field_id_;
|
||||
auto* index = segment->GetJsonKeyIndex(field_id);
|
||||
Assert(index != nullptr);
|
||||
auto filter_func = [segment, &elements, &field_id](bool valid,
|
||||
uint8_t type,
|
||||
uint32_t row_id,
|
||||
uint16_t offset,
|
||||
uint16_t size,
|
||||
int32_t value) {
|
||||
if (valid) {
|
||||
return false;
|
||||
} else {
|
||||
auto json_pair = segment->GetJsonData(field_id, row_id);
|
||||
if (!json_pair.second) {
|
||||
auto filter_func = [segment, &elements, &field_id](
|
||||
const bool* valid_array,
|
||||
const uint8_t* type_array,
|
||||
const uint32_t* row_id_array,
|
||||
const uint16_t* offset_array,
|
||||
const uint16_t* size_array,
|
||||
const int32_t* value_array,
|
||||
TargetBitmap& bitset,
|
||||
const size_t n) {
|
||||
std::vector<int64_t> invalid_row_ids;
|
||||
for (size_t i = 0; i < n; i++) {
|
||||
auto valid = valid_array[i];
|
||||
auto type = type_array[i];
|
||||
auto row_id = row_id_array[i];
|
||||
auto offset = offset_array[i];
|
||||
auto size = size_array[i];
|
||||
auto value = value_array[i];
|
||||
if (valid) {
|
||||
bitset[row_id] = false;
|
||||
} else {
|
||||
invalid_row_ids.push_back(row_id_array[i]);
|
||||
}
|
||||
}
|
||||
auto f = [&](const milvus::Json& json,
|
||||
uint16_t offset,
|
||||
uint16_t size,
|
||||
bool is_valid) {
|
||||
if (!is_valid) {
|
||||
return false;
|
||||
}
|
||||
auto& json = json_pair.first;
|
||||
auto array = json.array_at(offset, size);
|
||||
if (array.error()) {
|
||||
return false;
|
||||
@ -1410,7 +1532,18 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllArrayByKeyIndex() {
|
||||
}
|
||||
}
|
||||
return exist_elements_index.size() == elements.size();
|
||||
}
|
||||
};
|
||||
segment->BulkGetJsonData(
|
||||
field_id,
|
||||
[&](const milvus::Json& json, size_t i, bool is_valid) {
|
||||
auto type = type_array[i];
|
||||
auto row_id = invalid_row_ids[i];
|
||||
auto offset = offset_array[i];
|
||||
auto size = size_array[i];
|
||||
bitset[row_id] = f(json, offset, size, is_valid);
|
||||
},
|
||||
invalid_row_ids.data(),
|
||||
invalid_row_ids.size());
|
||||
};
|
||||
bool is_growing = segment_->type() == SegmentType::Growing;
|
||||
bool is_strong_consistency = consistency_level_ == 0;
|
||||
@ -1611,20 +1744,36 @@ PhyJsonContainsFilterExpr::ExecJsonContainsWithDiffTypeByKeyIndex() {
|
||||
auto field_id = expr_->column_.field_id_;
|
||||
auto* index = segment->GetJsonKeyIndex(field_id);
|
||||
Assert(index != nullptr);
|
||||
auto filter_func = [segment, &elements, &field_id](bool valid,
|
||||
uint8_t type,
|
||||
uint32_t row_id,
|
||||
uint16_t offset,
|
||||
uint16_t size,
|
||||
int32_t value) {
|
||||
if (valid) {
|
||||
return false;
|
||||
} else {
|
||||
auto json_pair = segment->GetJsonData(field_id, row_id);
|
||||
if (!json_pair.second) {
|
||||
auto filter_func = [segment, &elements, &field_id](
|
||||
const bool* valid_array,
|
||||
const uint8_t* type_array,
|
||||
const uint32_t* row_id_array,
|
||||
const uint16_t* offset_array,
|
||||
const uint16_t* size_array,
|
||||
const int32_t* value_array,
|
||||
TargetBitmap& bitset,
|
||||
const size_t n) {
|
||||
std::vector<int64_t> invalid_row_ids;
|
||||
for (size_t i = 0; i < n; i++) {
|
||||
auto valid = valid_array[i];
|
||||
auto type = type_array[i];
|
||||
auto row_id = row_id_array[i];
|
||||
auto offset = offset_array[i];
|
||||
auto size = size_array[i];
|
||||
auto value = value_array[i];
|
||||
if (valid) {
|
||||
bitset[row_id] = false;
|
||||
} else {
|
||||
invalid_row_ids.push_back(row_id_array[i]);
|
||||
}
|
||||
}
|
||||
auto f = [&](const milvus::Json& json,
|
||||
uint16_t offset,
|
||||
uint16_t size,
|
||||
bool is_valid) {
|
||||
if (!is_valid) {
|
||||
return false;
|
||||
}
|
||||
auto& json = json_pair.first;
|
||||
auto array = json.array_at(offset, size);
|
||||
if (array.error()) {
|
||||
return false;
|
||||
@ -1693,7 +1842,18 @@ PhyJsonContainsFilterExpr::ExecJsonContainsWithDiffTypeByKeyIndex() {
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
};
|
||||
segment->BulkGetJsonData(
|
||||
field_id,
|
||||
[&](const milvus::Json& json, size_t i, bool is_valid) {
|
||||
auto type = type_array[i];
|
||||
auto row_id = invalid_row_ids[i];
|
||||
auto offset = offset_array[i];
|
||||
auto size = size_array[i];
|
||||
bitset[row_id] = f(json, offset, size, is_valid);
|
||||
},
|
||||
invalid_row_ids.data(),
|
||||
invalid_row_ids.size());
|
||||
};
|
||||
bool is_growing = segment_->type() == SegmentType::Growing;
|
||||
bool is_strong_consistency = consistency_level_ == 0;
|
||||
|
||||
@ -578,54 +578,76 @@ PhyTermFilterExpr::ExecJsonInVariableByKeyIndex() {
|
||||
|
||||
Assert(index != nullptr);
|
||||
|
||||
auto filter_func = [this, segment, &field_id](bool valid,
|
||||
uint8_t type,
|
||||
uint32_t row_id,
|
||||
uint16_t offset,
|
||||
uint16_t size,
|
||||
int32_t value) {
|
||||
if (valid) {
|
||||
if constexpr (std::is_same_v<GetType, int64_t>) {
|
||||
if (type != uint8_t(milvus::index::JSONType::INT32) &&
|
||||
type != uint8_t(milvus::index::JSONType::INT64) &&
|
||||
type != uint8_t(milvus::index::JSONType::FLOAT) &&
|
||||
type != uint8_t(milvus::index::JSONType::DOUBLE)) {
|
||||
return false;
|
||||
}
|
||||
} else if constexpr (std::is_same_v<GetType,
|
||||
std::string_view>) {
|
||||
if (type != uint8_t(milvus::index::JSONType::STRING) &&
|
||||
type !=
|
||||
uint8_t(milvus::index::JSONType::STRING_ESCAPE)) {
|
||||
return false;
|
||||
}
|
||||
} else if constexpr (std::is_same_v<GetType, double>) {
|
||||
if (type != uint8_t(milvus::index::JSONType::INT32) &&
|
||||
type != uint8_t(milvus::index::JSONType::INT64) &&
|
||||
type != uint8_t(milvus::index::JSONType::FLOAT) &&
|
||||
type != uint8_t(milvus::index::JSONType::DOUBLE)) {
|
||||
return false;
|
||||
}
|
||||
} else if constexpr (std::is_same_v<GetType, bool>) {
|
||||
if (type != uint8_t(milvus::index::JSONType::BOOL)) {
|
||||
return false;
|
||||
}
|
||||
auto filter_func = [this, segment, &field_id](
|
||||
const bool* valid_array,
|
||||
const uint8_t* type_array,
|
||||
const uint32_t* row_id_array,
|
||||
const uint16_t* offset_array,
|
||||
const uint16_t* size_array,
|
||||
const int32_t* value_array,
|
||||
TargetBitmap& bitset,
|
||||
const size_t n) {
|
||||
std::vector<int64_t> invalid_row_ids;
|
||||
for (size_t i = 0; i < n; i++) {
|
||||
auto valid = valid_array[i];
|
||||
auto type = type_array[i];
|
||||
auto row_id = row_id_array[i];
|
||||
auto offset = offset_array[i];
|
||||
auto size = size_array[i];
|
||||
auto value = value_array[i];
|
||||
if (!valid) {
|
||||
invalid_row_ids.push_back(row_id);
|
||||
continue;
|
||||
}
|
||||
if constexpr (std::is_same_v<GetType, int64_t>) {
|
||||
return this->arg_set_->In(value);
|
||||
} else if constexpr (std::is_same_v<GetType, double>) {
|
||||
float restoredValue = *reinterpret_cast<float*>(&value);
|
||||
return this->arg_set_float_->In(restoredValue);
|
||||
} else if constexpr (std::is_same_v<GetType, bool>) {
|
||||
bool restoredValue = *reinterpret_cast<bool*>(&value);
|
||||
return this->arg_set_->In(restoredValue);
|
||||
}
|
||||
} else {
|
||||
auto json_pair = segment->GetJsonData(field_id, row_id);
|
||||
if (!json_pair.second) {
|
||||
auto f = [&]() {
|
||||
if constexpr (std::is_same_v<GetType, int64_t>) {
|
||||
if (type != uint8_t(milvus::index::JSONType::INT32) &&
|
||||
type != uint8_t(milvus::index::JSONType::INT64) &&
|
||||
type != uint8_t(milvus::index::JSONType::FLOAT) &&
|
||||
type != uint8_t(milvus::index::JSONType::DOUBLE)) {
|
||||
return false;
|
||||
}
|
||||
} else if constexpr (std::is_same_v<GetType,
|
||||
std::string_view>) {
|
||||
if (type != uint8_t(milvus::index::JSONType::STRING) &&
|
||||
type !=
|
||||
uint8_t(
|
||||
milvus::index::JSONType::STRING_ESCAPE)) {
|
||||
return false;
|
||||
}
|
||||
} else if constexpr (std::is_same_v<GetType, double>) {
|
||||
if (type != uint8_t(milvus::index::JSONType::INT32) &&
|
||||
type != uint8_t(milvus::index::JSONType::INT64) &&
|
||||
type != uint8_t(milvus::index::JSONType::FLOAT) &&
|
||||
type != uint8_t(milvus::index::JSONType::DOUBLE)) {
|
||||
return false;
|
||||
}
|
||||
} else if constexpr (std::is_same_v<GetType, bool>) {
|
||||
if (type != uint8_t(milvus::index::JSONType::BOOL)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
if constexpr (std::is_same_v<GetType, int64_t>) {
|
||||
return this->arg_set_->In(value);
|
||||
} else if constexpr (std::is_same_v<GetType, double>) {
|
||||
float restoredValue = *reinterpret_cast<float*>(&value);
|
||||
return this->arg_set_float_->In(restoredValue);
|
||||
} else if constexpr (std::is_same_v<GetType, bool>) {
|
||||
bool restoredValue = *reinterpret_cast<bool*>(&value);
|
||||
return this->arg_set_->In(restoredValue);
|
||||
}
|
||||
};
|
||||
bitset[row_id] = f();
|
||||
}
|
||||
|
||||
auto f = [&](const milvus::Json& json,
|
||||
uint8_t type,
|
||||
uint16_t offset,
|
||||
uint16_t size,
|
||||
bool is_valid) {
|
||||
if (!is_valid) {
|
||||
return false;
|
||||
}
|
||||
auto& json = json_pair.first;
|
||||
if (type == uint8_t(milvus::index::JSONType::STRING) ||
|
||||
type == uint8_t(milvus::index::JSONType::DOUBLE) ||
|
||||
type == uint8_t(milvus::index::JSONType::INT64)) {
|
||||
@ -663,7 +685,19 @@ PhyTermFilterExpr::ExecJsonInVariableByKeyIndex() {
|
||||
}
|
||||
return this->arg_set_->In(ValueType(val.value()));
|
||||
}
|
||||
}
|
||||
};
|
||||
segment->BulkGetJsonData(
|
||||
field_id,
|
||||
[&](const milvus::Json& json, size_t i, bool is_valid) {
|
||||
auto type = type_array[i];
|
||||
auto row_id = invalid_row_ids[i];
|
||||
auto offset = offset_array[i];
|
||||
auto size = size_array[i];
|
||||
auto value = value_array[i];
|
||||
bitset[row_id] = f(json, type, offset, size, is_valid);
|
||||
},
|
||||
invalid_row_ids.data(),
|
||||
invalid_row_ids.size());
|
||||
};
|
||||
bool is_growing = segment_->type() == SegmentType::Growing;
|
||||
bool is_strong_consistency = consistency_level_ == 0;
|
||||
|
||||
@ -1223,40 +1223,59 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJsonForIndex() {
|
||||
op_type,
|
||||
val,
|
||||
arrayIndex,
|
||||
pointer](bool valid,
|
||||
uint8_t type,
|
||||
uint32_t row_id,
|
||||
uint16_t offset,
|
||||
uint16_t size,
|
||||
int32_t value) {
|
||||
if (valid) {
|
||||
if (type == uint8_t(milvus::index::JSONType::UNKNOWN) ||
|
||||
!arrayIndex.empty()) {
|
||||
return false;
|
||||
pointer](const bool* valid_array,
|
||||
const uint8_t* type_array,
|
||||
const uint32_t* row_id_array,
|
||||
const uint16_t* offset_array,
|
||||
const uint16_t* size_array,
|
||||
const int32_t* value_array,
|
||||
TargetBitmap& bitset,
|
||||
const size_t n) {
|
||||
std::vector<int64_t> invalid_row_ids;
|
||||
for (size_t i = 0; i < n; i++) {
|
||||
auto valid = valid_array[i];
|
||||
auto type = type_array[i];
|
||||
auto row_id = row_id_array[i];
|
||||
auto offset = offset_array[i];
|
||||
auto size = size_array[i];
|
||||
auto value = value_array[i];
|
||||
if (!valid) {
|
||||
invalid_row_ids.push_back(row_id);
|
||||
continue;
|
||||
}
|
||||
ISVALIDJSONTYPE(type, GetType);
|
||||
switch (op_type) {
|
||||
case proto::plan::GreaterThan:
|
||||
CompareValueWithOpType(type, value, val, op_type);
|
||||
case proto::plan::GreaterEqual:
|
||||
CompareValueWithOpType(type, value, val, op_type);
|
||||
case proto::plan::LessThan:
|
||||
CompareValueWithOpType(type, value, val, op_type);
|
||||
case proto::plan::LessEqual:
|
||||
CompareValueWithOpType(type, value, val, op_type);
|
||||
case proto::plan::Equal:
|
||||
CompareValueWithOpType(type, value, val, op_type);
|
||||
case proto::plan::NotEqual:
|
||||
CompareValueWithOpType(type, value, val, op_type);
|
||||
default:
|
||||
auto f = [&]() {
|
||||
if (type == uint8_t(milvus::index::JSONType::UNKNOWN) ||
|
||||
!arrayIndex.empty()) {
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
auto json_pair = segment->GetJsonData(field_id, row_id);
|
||||
if (!json_pair.second) {
|
||||
}
|
||||
ISVALIDJSONTYPE(type, GetType);
|
||||
switch (op_type) {
|
||||
case proto::plan::GreaterThan:
|
||||
CompareValueWithOpType(type, value, val, op_type);
|
||||
case proto::plan::GreaterEqual:
|
||||
CompareValueWithOpType(type, value, val, op_type);
|
||||
case proto::plan::LessThan:
|
||||
CompareValueWithOpType(type, value, val, op_type);
|
||||
case proto::plan::LessEqual:
|
||||
CompareValueWithOpType(type, value, val, op_type);
|
||||
case proto::plan::Equal:
|
||||
CompareValueWithOpType(type, value, val, op_type);
|
||||
case proto::plan::NotEqual:
|
||||
CompareValueWithOpType(type, value, val, op_type);
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
};
|
||||
bitset[row_id] = f();
|
||||
}
|
||||
auto f = [&](const milvus::Json& json,
|
||||
uint8_t type,
|
||||
uint16_t offset,
|
||||
uint16_t size,
|
||||
bool is_valid) {
|
||||
if (!is_valid) {
|
||||
return false;
|
||||
}
|
||||
auto& json = json_pair.first;
|
||||
switch (op_type) {
|
||||
case proto::plan::GreaterThan:
|
||||
if constexpr (std::is_same_v<GetType,
|
||||
@ -1422,7 +1441,19 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJsonForIndex() {
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
}
|
||||
};
|
||||
segment->BulkGetJsonData(
|
||||
field_id,
|
||||
[&](const milvus::Json& json, size_t i, bool is_valid) {
|
||||
auto type = type_array[i];
|
||||
auto row_id = invalid_row_ids[i];
|
||||
auto offset = offset_array[i];
|
||||
auto size = size_array[i];
|
||||
auto value = value_array[i];
|
||||
bitset[row_id] = f(json, type, offset, size, is_valid);
|
||||
},
|
||||
invalid_row_ids.data(),
|
||||
invalid_row_ids.size());
|
||||
};
|
||||
bool is_growing = segment_->type() == SegmentType::Growing;
|
||||
bool is_strong_consistency = consistency_level_ == 0;
|
||||
|
||||
@ -59,17 +59,29 @@ class JsonKeyStatsInvertedIndex : public InvertedIndexTantivy<std::string> {
|
||||
BuildWithFieldData(const std::vector<FieldDataPtr>& datas, bool nullable);
|
||||
|
||||
const TargetBitmap
|
||||
FilterByPath(
|
||||
const std::string& path,
|
||||
int32_t row,
|
||||
bool is_growing,
|
||||
bool is_strong_consistency,
|
||||
std::function<bool(
|
||||
bool, uint8_t, uint32_t, uint16_t, uint16_t, int32_t)> filter) {
|
||||
FilterByPath(const std::string& path,
|
||||
int32_t row,
|
||||
bool is_growing,
|
||||
bool is_strong_consistency,
|
||||
std::function<void(const bool*,
|
||||
const uint8_t*,
|
||||
const uint32_t*,
|
||||
const uint16_t*,
|
||||
const uint16_t*,
|
||||
const int32_t*,
|
||||
TargetBitmap&,
|
||||
const size_t size)> filter) {
|
||||
auto processArray = [this, &path, row, &filter]() {
|
||||
TargetBitmap bitset(row);
|
||||
auto array = wrapper_->term_query_i64(path);
|
||||
LOG_INFO("json key filter size:{}", array.array_.len);
|
||||
folly::fbvector<bool> valid_array(array.array_.len);
|
||||
std::vector<uint8_t> type_array(array.array_.len);
|
||||
std::vector<uint32_t> row_id_array(array.array_.len);
|
||||
std::vector<uint16_t> offset_array(array.array_.len);
|
||||
std::vector<uint16_t> size_array(array.array_.len);
|
||||
std::vector<int32_t> value_array(array.array_.len);
|
||||
|
||||
for (size_t j = 0; j < array.array_.len; j++) {
|
||||
auto the_offset = array.array_.array[j];
|
||||
|
||||
@ -79,27 +91,37 @@ class JsonKeyStatsInvertedIndex : public InvertedIndexTantivy<std::string> {
|
||||
if (row_id >= row) {
|
||||
continue;
|
||||
}
|
||||
bitset[row_id] = filter(true,
|
||||
std::get<0>(tuple),
|
||||
std::get<1>(tuple),
|
||||
0,
|
||||
0,
|
||||
std::get<2>(tuple));
|
||||
|
||||
valid_array[j] = true;
|
||||
type_array[j] = std::get<0>(tuple);
|
||||
row_id_array[j] = std::get<1>(tuple);
|
||||
offset_array[j] = 0;
|
||||
size_array[j] = 0;
|
||||
value_array[j] = std::get<2>(tuple);
|
||||
} else {
|
||||
auto tuple = DecodeOffset(the_offset);
|
||||
auto row_id = std::get<1>(tuple);
|
||||
if (row_id >= row) {
|
||||
continue;
|
||||
}
|
||||
bitset[row_id] = filter(false,
|
||||
std::get<0>(tuple),
|
||||
std::get<1>(tuple),
|
||||
std::get<2>(tuple),
|
||||
std::get<3>(tuple),
|
||||
0);
|
||||
valid_array[j] = false;
|
||||
type_array[j] = std::get<0>(tuple);
|
||||
row_id_array[j] = std::get<1>(tuple);
|
||||
offset_array[j] = std::get<2>(tuple);
|
||||
size_array[j] = std::get<3>(tuple);
|
||||
value_array[j] = 0;
|
||||
}
|
||||
}
|
||||
|
||||
filter(valid_array.data(),
|
||||
type_array.data(),
|
||||
row_id_array.data(),
|
||||
offset_array.data(),
|
||||
size_array.data(),
|
||||
value_array.data(),
|
||||
bitset,
|
||||
array.array_.len);
|
||||
|
||||
return bitset;
|
||||
};
|
||||
|
||||
|
||||
@ -332,24 +332,37 @@ class ChunkedVariableColumn : public ChunkedColumnBase {
|
||||
}
|
||||
}
|
||||
|
||||
Json
|
||||
RawJsonAt(size_t i) const override {
|
||||
void
|
||||
BulkRawJsonAt(std::function<void(Json, size_t, bool)> fn,
|
||||
const int64_t* offsets,
|
||||
int64_t count) const override {
|
||||
if constexpr (!std::is_same_v<T, Json>) {
|
||||
PanicInfo(
|
||||
ErrorCode::Unsupported,
|
||||
"RawJsonAt only supported for ChunkedVariableColumn<Json>");
|
||||
}
|
||||
if (i < 0 || i >= num_rows_) {
|
||||
PanicInfo(ErrorCode::OutOfRange, "index out of range");
|
||||
if (offsets == nullptr) {
|
||||
auto ca = SemiInlineGet(slot_->PinAllCells());
|
||||
for (int64_t i = 0; i < num_rows_; i++) {
|
||||
auto [chunk_id, offset_in_chunk] = GetChunkIDByOffset(i);
|
||||
auto chunk = ca->get_cell_of(chunk_id);
|
||||
auto valid = nullable_ ? chunk->isValid(offset_in_chunk) : true;
|
||||
auto str_view = static_cast<StringChunk*>(chunk)->operator[](
|
||||
offset_in_chunk);
|
||||
fn(Json(str_view.data(), str_view.size()), i, valid);
|
||||
}
|
||||
} else {
|
||||
auto [cids, offsets_in_chunk] = ToChunkIdAndOffset(offsets, count);
|
||||
auto ca = SemiInlineGet(slot_->PinCells(cids));
|
||||
for (int64_t i = 0; i < count; i++) {
|
||||
auto chunk = ca->get_cell_of(cids[i]);
|
||||
auto valid =
|
||||
nullable_ ? chunk->isValid(offsets_in_chunk[i]) : true;
|
||||
auto str_view = static_cast<StringChunk*>(chunk)->operator[](
|
||||
offsets_in_chunk[i]);
|
||||
fn(Json(str_view.data(), str_view.size()), i, valid);
|
||||
}
|
||||
}
|
||||
|
||||
auto [chunk_id, offset_in_chunk] = GetChunkIDByOffset(i);
|
||||
auto ca =
|
||||
SemiInlineGet(slot_->PinCells({static_cast<cid_t>(chunk_id)}));
|
||||
auto chunk = ca->get_cell_of(chunk_id);
|
||||
std::string_view str_view =
|
||||
static_cast<StringChunk*>(chunk)->operator[](offset_in_chunk);
|
||||
return Json(str_view.data(), str_view.size());
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@ -360,19 +360,45 @@ class ProxyChunkColumn : public ChunkedColumnInterface {
|
||||
}
|
||||
|
||||
// TODO(tiered storage 2): replace with Bulk version
|
||||
Json
|
||||
RawJsonAt(size_t i) const override {
|
||||
void
|
||||
BulkRawJsonAt(std::function<void(Json, size_t, bool)> fn,
|
||||
const int64_t* offsets,
|
||||
int64_t count) const override {
|
||||
if (data_type_ != DataType::JSON) {
|
||||
PanicInfo(
|
||||
ErrorCode::Unsupported,
|
||||
"RawJsonAt only supported for ProxyChunkColumn of Json type");
|
||||
}
|
||||
auto [chunk_id, offset_in_chunk] = GetChunkIDByOffset(i);
|
||||
auto group_chunk = group_->GetGroupChunk(chunk_id);
|
||||
auto chunk = group_chunk.get()->GetChunk(field_id_);
|
||||
std::string_view str_view =
|
||||
static_cast<StringChunk*>(chunk.get())->operator[](offset_in_chunk);
|
||||
return Json(str_view.data(), str_view.size());
|
||||
if (offsets == nullptr) {
|
||||
int64_t current_offset = 0;
|
||||
for (cid_t cid = 0; cid < num_chunks(); ++cid) {
|
||||
auto group_chunk = group_->GetGroupChunk(cid);
|
||||
auto chunk = group_chunk.get()->GetChunk(field_id_);
|
||||
auto chunk_rows = chunk->RowNums();
|
||||
for (int64_t i = 0; i < chunk_rows; ++i) {
|
||||
auto valid = chunk->isValid(i);
|
||||
auto str_view =
|
||||
static_cast<StringChunk*>(chunk.get())->operator[](i);
|
||||
fn(Json(str_view.data(), str_view.size()),
|
||||
current_offset + i,
|
||||
valid);
|
||||
}
|
||||
current_offset += chunk_rows;
|
||||
}
|
||||
} else {
|
||||
auto [cids, offsets_in_chunk] = ToChunkIdAndOffset(offsets, count);
|
||||
auto ca = group_->GetGroupChunks(cids);
|
||||
|
||||
for (int64_t i = 0; i < count; i++) {
|
||||
auto* group_chunk = ca->get_cell_of(cids[i]);
|
||||
auto chunk = group_chunk->GetChunk(field_id_);
|
||||
auto valid = chunk->isValid(offsets_in_chunk[i]);
|
||||
auto str_view = static_cast<StringChunk*>(chunk.get())
|
||||
->
|
||||
operator[](offsets_in_chunk[i]);
|
||||
fn(Json(str_view.data(), str_view.size()), i, valid);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
|
||||
@ -119,8 +119,10 @@ class ChunkedColumnInterface {
|
||||
"variable length type");
|
||||
}
|
||||
|
||||
virtual Json
|
||||
RawJsonAt(size_t offset) const {
|
||||
virtual void
|
||||
BulkRawJsonAt(std::function<void(Json, size_t, bool)> fn,
|
||||
const int64_t* offsets = nullptr,
|
||||
int64_t count = 0) const {
|
||||
PanicInfo(
|
||||
ErrorCode::Unsupported,
|
||||
"RawJsonAt only supported for ChunkColumnInterface of Json type");
|
||||
|
||||
@ -1019,11 +1019,12 @@ ChunkedSegmentSealedImpl::bulk_subscript_ptr_impl(
|
||||
int64_t count,
|
||||
google::protobuf::RepeatedPtrField<std::string>* dst) {
|
||||
if constexpr (std::is_same_v<S, Json>) {
|
||||
for (int64_t i = 0; i < count; ++i) {
|
||||
auto offset = seg_offsets[i];
|
||||
Json json = column->RawJsonAt(offset);
|
||||
dst->at(i) = std::move(std::string(json.data()));
|
||||
}
|
||||
column->BulkRawJsonAt(
|
||||
[&](Json json, size_t offset, bool is_valid) {
|
||||
dst->at(offset) = std::move(std::string(json.data()));
|
||||
},
|
||||
seg_offsets,
|
||||
count);
|
||||
} else {
|
||||
static_assert(std::is_same_v<S, std::string>);
|
||||
column->BulkRawStringAt(
|
||||
@ -1443,12 +1444,13 @@ ChunkedSegmentSealedImpl::bulk_subscript(
|
||||
count);
|
||||
}
|
||||
auto dst = ret->mutable_scalars()->mutable_json_data()->mutable_data();
|
||||
for (int64_t i = 0; i < count; ++i) {
|
||||
auto offset = seg_offsets[i];
|
||||
Json json = column->RawJsonAt(offset);
|
||||
dst->at(i) =
|
||||
ExtractSubJson(std::string(json.data()), dynamic_field_names);
|
||||
}
|
||||
column->BulkRawJsonAt(
|
||||
[&](Json json, size_t offset, bool is_valid) {
|
||||
dst->at(offset) =
|
||||
ExtractSubJson(std::string(json.data()), dynamic_field_names);
|
||||
},
|
||||
seg_offsets,
|
||||
count);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
@ -119,14 +119,13 @@ class ChunkedSegmentSealedImpl : public SegmentSealed {
|
||||
}
|
||||
|
||||
// TODO(tiered storage 1): should return a PinWrapper
|
||||
std::pair<milvus::Json, bool>
|
||||
GetJsonData(FieldId field_id, size_t offset) const override {
|
||||
void
|
||||
BulkGetJsonData(FieldId field_id,
|
||||
std::function<void(milvus::Json, size_t, bool)> fn,
|
||||
const int64_t* offsets,
|
||||
int64_t count) const override {
|
||||
auto column = fields_.at(field_id);
|
||||
bool is_valid = column->IsValid(offset);
|
||||
if (!is_valid) {
|
||||
return std::make_pair(milvus::Json(), false);
|
||||
}
|
||||
return std::make_pair(column->RawJsonAt(offset), is_valid);
|
||||
column->BulkRawJsonAt(fn, offsets, count);
|
||||
}
|
||||
|
||||
void
|
||||
|
||||
@ -1173,17 +1173,28 @@ SegmentGrowingImpl::CreateJSONIndex(FieldId field_id) {
|
||||
json_indexes_[field_id] = std::move(index);
|
||||
}
|
||||
|
||||
std::pair<milvus::Json, bool>
|
||||
SegmentGrowingImpl::GetJsonData(FieldId field_id, size_t offset) const {
|
||||
void
|
||||
SegmentGrowingImpl::BulkGetJsonData(
|
||||
FieldId field_id,
|
||||
std::function<void(milvus::Json, size_t, bool)> fn,
|
||||
const int64_t* offsets,
|
||||
int64_t count) const {
|
||||
auto vec_ptr = dynamic_cast<const ConcurrentVector<Json>*>(
|
||||
insert_record_.get_data_base(field_id));
|
||||
auto& src = *vec_ptr;
|
||||
auto& field_meta = schema_->operator[](field_id);
|
||||
if (field_meta.is_nullable()) {
|
||||
auto valid_data_ptr = insert_record_.get_valid_data(field_id);
|
||||
return std::make_pair(src[offset], valid_data_ptr->is_valid(offset));
|
||||
for (int64_t i = 0; i < count; ++i) {
|
||||
auto offset = offsets[i];
|
||||
fn(src[offset], i, valid_data_ptr->is_valid(offset));
|
||||
}
|
||||
} else {
|
||||
for (int64_t i = 0; i < count; ++i) {
|
||||
auto offset = offsets[i];
|
||||
fn(src[offset], i, true);
|
||||
}
|
||||
}
|
||||
return std::make_pair(src[offset], true);
|
||||
}
|
||||
|
||||
void
|
||||
|
||||
@ -253,8 +253,11 @@ class SegmentGrowingImpl : public SegmentGrowing {
|
||||
int64_t count,
|
||||
const std::vector<std::string>& dynamic_field_names) const override;
|
||||
|
||||
virtual std::pair<milvus::Json, bool>
|
||||
GetJsonData(FieldId field_id, size_t offset) const override;
|
||||
virtual void
|
||||
BulkGetJsonData(FieldId field_id,
|
||||
std::function<void(milvus::Json, size_t, bool)> fn,
|
||||
const int64_t* offsets,
|
||||
int64_t count) const override;
|
||||
|
||||
public:
|
||||
friend std::unique_ptr<SegmentGrowing>
|
||||
|
||||
@ -144,8 +144,11 @@ class SegmentInterface {
|
||||
virtual index::JsonKeyStatsInvertedIndex*
|
||||
GetJsonKeyIndex(FieldId field_id) const = 0;
|
||||
|
||||
virtual std::pair<milvus::Json, bool>
|
||||
GetJsonData(FieldId field_id, size_t offset) const = 0;
|
||||
virtual void
|
||||
BulkGetJsonData(FieldId field_id,
|
||||
std::function<void(milvus::Json, size_t, bool)> fn,
|
||||
const int64_t* offsets,
|
||||
int64_t count) const = 0;
|
||||
|
||||
virtual void
|
||||
LazyCheckSchema(const Schema& sch) = 0;
|
||||
|
||||
@ -214,13 +214,19 @@ TEST_P(JsonKeyStatsIndexTest, TestTermInFunc) {
|
||||
};
|
||||
std::unordered_set<int64_t> term_set(testcase.term.begin(),
|
||||
testcase.term.end());
|
||||
auto filter_func = [&term_set, this](bool valid,
|
||||
uint8_t type,
|
||||
uint32_t row_id,
|
||||
uint16_t offset,
|
||||
uint16_t size,
|
||||
int32_t value) {
|
||||
return term_set.find(int64_t(value)) != term_set.end();
|
||||
auto filter_func = [&term_set, this](const bool* valid_array,
|
||||
const uint8_t* type_array,
|
||||
const uint32_t* row_id_array,
|
||||
const uint16_t* offset_array,
|
||||
const uint16_t* size_array,
|
||||
const int32_t* value_array,
|
||||
TargetBitmap& bitset,
|
||||
const size_t n) {
|
||||
for (size_t i = 0; i < n; i++) {
|
||||
auto value = value_array[i];
|
||||
bitset[row_id_array[i]] =
|
||||
term_set.find(int64_t(value)) != term_set.end();
|
||||
}
|
||||
};
|
||||
auto pointer = milvus::Json::pointer(testcase.nested_path);
|
||||
auto bitset =
|
||||
@ -292,27 +298,41 @@ TEST_P(JsonKeyStatsIndexTest, TestUnaryRangeInFunc) {
|
||||
}
|
||||
}
|
||||
|
||||
auto filter_func = [&op, &testcase, this](bool valid,
|
||||
uint8_t type,
|
||||
uint32_t row_id,
|
||||
uint16_t offset,
|
||||
uint16_t size,
|
||||
int32_t value) {
|
||||
switch (op) {
|
||||
case OpType::GreaterThan:
|
||||
return int64_t(value) > testcase.val;
|
||||
case OpType::GreaterEqual:
|
||||
return int64_t(value) >= testcase.val;
|
||||
case OpType::LessThan:
|
||||
return int64_t(value) < testcase.val;
|
||||
case OpType::LessEqual:
|
||||
return int64_t(value) <= testcase.val;
|
||||
case OpType::Equal:
|
||||
return int64_t(value) == testcase.val;
|
||||
case OpType::NotEqual:
|
||||
return int64_t(value) != testcase.val;
|
||||
default:
|
||||
return false;
|
||||
auto filter_func = [&op, &testcase, this](
|
||||
const bool* valid_array,
|
||||
const uint8_t* type_array,
|
||||
const uint32_t* row_id_array,
|
||||
const uint16_t* offset_array,
|
||||
const uint16_t* size_array,
|
||||
const int32_t* value_array,
|
||||
TargetBitmap& bitset,
|
||||
const size_t n) {
|
||||
for (size_t i = 0; i < n; i++) {
|
||||
auto value = value_array[i];
|
||||
auto row_id = row_id_array[i];
|
||||
switch (op) {
|
||||
case OpType::GreaterThan:
|
||||
bitset[row_id] = value > testcase.val;
|
||||
break;
|
||||
case OpType::GreaterEqual:
|
||||
bitset[row_id] = value >= testcase.val;
|
||||
break;
|
||||
case OpType::LessThan:
|
||||
bitset[row_id] = value < testcase.val;
|
||||
break;
|
||||
case OpType::LessEqual:
|
||||
bitset[row_id] = value <= testcase.val;
|
||||
break;
|
||||
case OpType::Equal:
|
||||
bitset[row_id] = value == testcase.val;
|
||||
break;
|
||||
case OpType::NotEqual:
|
||||
bitset[row_id] = value != testcase.val;
|
||||
break;
|
||||
default:
|
||||
bitset[row_id] = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
};
|
||||
auto pointer = milvus::Json::pointer(testcase.nested_path);
|
||||
@ -372,48 +392,62 @@ TEST_P(JsonKeyStatsIndexTest, TestBinaryRangeInFunc) {
|
||||
}
|
||||
};
|
||||
|
||||
auto filter_func = [&testcase, this](bool valid,
|
||||
uint8_t type,
|
||||
uint32_t row_id,
|
||||
uint16_t offset,
|
||||
uint16_t size,
|
||||
int32_t value) {
|
||||
if (valid) {
|
||||
if (testcase.lower_inclusive && testcase.upper_inclusive) {
|
||||
return testcase.lower <= int64_t(value) &&
|
||||
int64_t(value) <= testcase.upper;
|
||||
} else if (testcase.lower_inclusive &&
|
||||
!testcase.upper_inclusive) {
|
||||
return testcase.lower <= int64_t(value) &&
|
||||
int64_t(value) < testcase.upper;
|
||||
} else if (!testcase.lower_inclusive &&
|
||||
testcase.upper_inclusive) {
|
||||
return testcase.lower < int64_t(value) &&
|
||||
int64_t(value) <= testcase.upper;
|
||||
auto filter_func = [&testcase, this](const bool* valid_array,
|
||||
const uint8_t* type_array,
|
||||
const uint32_t* row_id_array,
|
||||
const uint16_t* offset_array,
|
||||
const uint16_t* size_array,
|
||||
const int32_t* value_array,
|
||||
TargetBitmap& bitset,
|
||||
const size_t n) {
|
||||
for (size_t i = 0; i < n; i++) {
|
||||
auto valid = valid_array[i];
|
||||
if (valid) {
|
||||
auto value = value_array[i];
|
||||
auto row_id = row_id_array[i];
|
||||
if (testcase.lower_inclusive && testcase.upper_inclusive) {
|
||||
bitset[row_id] =
|
||||
testcase.lower <= value && value <= testcase.upper;
|
||||
} else if (testcase.lower_inclusive &&
|
||||
!testcase.upper_inclusive) {
|
||||
bitset[row_id] =
|
||||
testcase.lower <= value && value < testcase.upper;
|
||||
} else if (!testcase.lower_inclusive &&
|
||||
testcase.upper_inclusive) {
|
||||
bitset[row_id] =
|
||||
testcase.lower < value && value <= testcase.upper;
|
||||
} else {
|
||||
bitset[row_id] =
|
||||
testcase.lower < value && value < testcase.upper;
|
||||
}
|
||||
} else {
|
||||
return testcase.lower < int64_t(value) &&
|
||||
int64_t(value) < testcase.upper;
|
||||
}
|
||||
} else {
|
||||
auto val =
|
||||
this->data_[row_id].template at<int64_t>(offset, size);
|
||||
if (val.error()) {
|
||||
return false;
|
||||
}
|
||||
if (testcase.lower_inclusive && testcase.upper_inclusive) {
|
||||
return testcase.lower <= int64_t(val.value()) &&
|
||||
int64_t(val.value()) <= testcase.upper;
|
||||
} else if (testcase.lower_inclusive &&
|
||||
!testcase.upper_inclusive) {
|
||||
return testcase.lower <= int64_t(val.value()) &&
|
||||
int64_t(val.value()) < testcase.upper;
|
||||
} else if (!testcase.lower_inclusive &&
|
||||
testcase.upper_inclusive) {
|
||||
return testcase.lower < int64_t(val.value()) &&
|
||||
int64_t(val.value()) <= testcase.upper;
|
||||
} else {
|
||||
return testcase.lower < int64_t(val.value()) &&
|
||||
int64_t(val.value()) < testcase.upper;
|
||||
auto offset = offset_array[i];
|
||||
auto size = size_array[i];
|
||||
auto row_id = row_id_array[i];
|
||||
auto val =
|
||||
this->data_[row_id].template at<int64_t>(offset, size);
|
||||
if (val.error()) {
|
||||
bitset[row_id] = false;
|
||||
}
|
||||
if (testcase.lower_inclusive && testcase.upper_inclusive) {
|
||||
bitset[row_id] =
|
||||
testcase.lower <= int64_t(val.value()) &&
|
||||
int64_t(val.value()) <= testcase.upper;
|
||||
} else if (testcase.lower_inclusive &&
|
||||
!testcase.upper_inclusive) {
|
||||
bitset[row_id] =
|
||||
testcase.lower <= int64_t(val.value()) &&
|
||||
int64_t(val.value()) < testcase.upper;
|
||||
} else if (!testcase.lower_inclusive &&
|
||||
testcase.upper_inclusive) {
|
||||
bitset[row_id] =
|
||||
testcase.lower < int64_t(val.value()) &&
|
||||
int64_t(val.value()) <= testcase.upper;
|
||||
} else {
|
||||
bitset[row_id] =
|
||||
testcase.lower < int64_t(val.value()) &&
|
||||
int64_t(val.value()) < testcase.upper;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
@ -452,13 +486,18 @@ TEST_P(JsonKeyStatsIndexTest, TestExistInFunc) {
|
||||
};
|
||||
for (const auto& testcase : testcases) {
|
||||
auto pointer = milvus::Json::pointer(testcase.nested_path);
|
||||
auto filter_func = [&pointer, this](bool valid,
|
||||
uint8_t type,
|
||||
uint32_t row_id,
|
||||
uint16_t offset,
|
||||
uint16_t size,
|
||||
int32_t value) {
|
||||
return this->data_[row_id].exist(pointer);
|
||||
auto filter_func = [&pointer, this](const bool* valid_array,
|
||||
const uint8_t* type_array,
|
||||
const uint32_t* row_id_array,
|
||||
const uint16_t* offset_array,
|
||||
const uint16_t* size_array,
|
||||
const int32_t* value_array,
|
||||
TargetBitmap& bitset,
|
||||
const size_t n) {
|
||||
for (size_t i = 0; i < n; i++) {
|
||||
auto row_id = row_id_array[i];
|
||||
bitset[row_id] = this->data_[row_id].exist(pointer);
|
||||
}
|
||||
};
|
||||
|
||||
auto bitset =
|
||||
@ -501,25 +540,32 @@ TEST_P(JsonKeyStatsIndexTest, TestJsonContainsAllFunc) {
|
||||
for (auto const& element : testcase.term) {
|
||||
elements.insert(element);
|
||||
}
|
||||
auto filter_func = [&elements, this](bool valid,
|
||||
uint8_t type,
|
||||
uint32_t row_id,
|
||||
uint16_t offset,
|
||||
uint16_t size,
|
||||
int32_t value) {
|
||||
auto array = this->data_[row_id].array_at(offset, size);
|
||||
std::unordered_set<int64_t> tmp_elements(elements);
|
||||
for (auto&& it : array) {
|
||||
auto val = it.template get<int64_t>();
|
||||
if (val.error()) {
|
||||
continue;
|
||||
auto filter_func = [&elements, this](const bool* valid_array,
|
||||
const uint8_t* type_array,
|
||||
const uint32_t* row_id_array,
|
||||
const uint16_t* offset_array,
|
||||
const uint16_t* size_array,
|
||||
const int32_t* value_array,
|
||||
TargetBitmap& bitset,
|
||||
const size_t n) {
|
||||
for (size_t i = 0; i < n; i++) {
|
||||
auto row_id = row_id_array[i];
|
||||
auto offset = offset_array[i];
|
||||
auto size = size_array[i];
|
||||
auto array = this->data_[row_id].array_at(offset, size);
|
||||
std::unordered_set<int64_t> tmp_elements(elements);
|
||||
for (auto&& it : array) {
|
||||
auto val = it.template get<int64_t>();
|
||||
if (val.error()) {
|
||||
continue;
|
||||
}
|
||||
tmp_elements.erase(val.value());
|
||||
}
|
||||
tmp_elements.erase(val.value());
|
||||
if (tmp_elements.size() == 0) {
|
||||
return true;
|
||||
bitset[row_id] = true;
|
||||
}
|
||||
bitset[row_id] = tmp_elements.empty();
|
||||
}
|
||||
return tmp_elements.empty();
|
||||
};
|
||||
|
||||
auto bitset =
|
||||
@ -566,16 +612,18 @@ TEST(GrowingJsonKeyStatsIndexTest, GrowingIndex) {
|
||||
index->Commit();
|
||||
index->Reload();
|
||||
int64_t checkVal = 1;
|
||||
auto filter_func = [jsons, checkVal](bool valid,
|
||||
uint8_t type,
|
||||
uint32_t row_id,
|
||||
uint16_t offset,
|
||||
uint16_t size,
|
||||
int32_t value) {
|
||||
if (value == checkVal) {
|
||||
return true;
|
||||
auto filter_func = [jsons, checkVal](const bool* valid_array,
|
||||
const uint8_t* type_array,
|
||||
const uint32_t* row_id_array,
|
||||
const uint16_t* offset_array,
|
||||
const uint16_t* size_array,
|
||||
const int32_t* value_array,
|
||||
TargetBitmap& bitset,
|
||||
const size_t n) {
|
||||
for (size_t i = 0; i < n; i++) {
|
||||
auto value = value_array[i];
|
||||
bitset[row_id_array[i]] = value == checkVal;
|
||||
}
|
||||
return false;
|
||||
};
|
||||
auto pointer = milvus::Json::pointer({"int"});
|
||||
auto bitset =
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user