mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
enhance: enable stlsort with mmap support (#43359)
issue: https://github.com/milvus-io/milvus/issues/43358 --------- Signed-off-by: SpadeA <tangchenjie1210@gmail.com>
This commit is contained in:
parent
9bf1cb02d5
commit
864d1b93b1
@ -277,7 +277,8 @@ struct EvictionConfig {
|
||||
float loading_memory_factor = 2.5f)
|
||||
: cache_touch_window(std::chrono::milliseconds(cache_touch_window_ms)),
|
||||
eviction_interval(std::chrono::milliseconds(eviction_interval_ms)),
|
||||
overloaded_memory_threshold_percentage(overloaded_memory_threshold_percentage),
|
||||
overloaded_memory_threshold_percentage(
|
||||
overloaded_memory_threshold_percentage),
|
||||
max_disk_usage_percentage(max_disk_usage_percentage),
|
||||
loading_memory_factor(loading_memory_factor) {
|
||||
}
|
||||
|
||||
@ -70,7 +70,9 @@ initTelemetry(const TraceConfig& cfg) {
|
||||
auto headers_map = parseHeaders(cfg.otlpHeaders);
|
||||
if (!headers_map.empty()) {
|
||||
for (const auto& pair : headers_map) {
|
||||
opts.http_headers.insert(std::pair<std::string, std::string>(pair.first, pair.second));
|
||||
opts.http_headers.insert(
|
||||
std::pair<std::string, std::string>(pair.first,
|
||||
pair.second));
|
||||
}
|
||||
}
|
||||
exporter = otlp::OtlpHttpExporterFactory::Create(opts);
|
||||
@ -82,7 +84,8 @@ initTelemetry(const TraceConfig& cfg) {
|
||||
auto headers_map = parseHeaders(cfg.otlpHeaders);
|
||||
if (!headers_map.empty()) {
|
||||
for (const auto& pair : headers_map) {
|
||||
opts.metadata.insert(std::pair<std::string, std::string>(pair.first, pair.second));
|
||||
opts.metadata.insert(std::pair<std::string, std::string>(
|
||||
pair.first, pair.second));
|
||||
}
|
||||
}
|
||||
opts.use_ssl_credentials = cfg.oltpSecure;
|
||||
@ -296,14 +299,16 @@ std::map<std::string, std::string>
|
||||
parseHeaders(const std::string& headers) {
|
||||
if (headers.empty()) {
|
||||
return {};
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
try {
|
||||
nlohmann::json json = nlohmann::json::parse(headers);
|
||||
return json.get<std::map<std::string, std::string>>();
|
||||
} catch (const std::exception& e) {
|
||||
// Log the parsing error and return empty map
|
||||
LOG_ERROR("Failed to parse headers as JSON: {}, error: {}", headers, e.what());
|
||||
LOG_ERROR("Failed to parse headers as JSON: {}, error: {}",
|
||||
headers,
|
||||
e.what());
|
||||
return {};
|
||||
}
|
||||
}
|
||||
|
||||
@ -305,19 +305,17 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForData(EvalCtx& context) {
|
||||
TargetBitmapView valid_res(res_vec->GetValidRawData(), real_batch_size);
|
||||
|
||||
size_t processed_cursor = 0;
|
||||
auto execute_sub_batch = [lower_inclusive,
|
||||
upper_inclusive,
|
||||
&processed_cursor,
|
||||
&bitmap_input]<FilterType filter_type =
|
||||
FilterType::sequential>(
|
||||
const T* data,
|
||||
const bool* valid_data,
|
||||
const int32_t* offsets,
|
||||
const int size,
|
||||
TargetBitmapView res,
|
||||
TargetBitmapView valid_res,
|
||||
HighPrecisionType val1,
|
||||
HighPrecisionType val2) {
|
||||
auto execute_sub_batch =
|
||||
[ lower_inclusive, upper_inclusive, &processed_cursor, &
|
||||
bitmap_input ]<FilterType filter_type = FilterType::sequential>(
|
||||
const T* data,
|
||||
const bool* valid_data,
|
||||
const int32_t* offsets,
|
||||
const int size,
|
||||
TargetBitmapView res,
|
||||
TargetBitmapView valid_res,
|
||||
HighPrecisionType val1,
|
||||
HighPrecisionType val2) {
|
||||
if (lower_inclusive && upper_inclusive) {
|
||||
BinaryRangeElementFunc<T, true, true, filter_type> func;
|
||||
func(val1,
|
||||
@ -449,20 +447,22 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForJson(EvalCtx& context) {
|
||||
auto pointer = milvus::Json::pointer(expr_->column_.nested_path_);
|
||||
|
||||
size_t processed_cursor = 0;
|
||||
auto execute_sub_batch = [lower_inclusive,
|
||||
upper_inclusive,
|
||||
pointer,
|
||||
&bitmap_input,
|
||||
&processed_cursor]<FilterType filter_type =
|
||||
FilterType::sequential>(
|
||||
const milvus::Json* data,
|
||||
const bool* valid_data,
|
||||
const int32_t* offsets,
|
||||
const int size,
|
||||
TargetBitmapView res,
|
||||
TargetBitmapView valid_res,
|
||||
ValueType val1,
|
||||
ValueType val2) {
|
||||
auto execute_sub_batch =
|
||||
[
|
||||
lower_inclusive,
|
||||
upper_inclusive,
|
||||
pointer,
|
||||
&bitmap_input,
|
||||
&processed_cursor
|
||||
]<FilterType filter_type = FilterType::sequential>(
|
||||
const milvus::Json* data,
|
||||
const bool* valid_data,
|
||||
const int32_t* offsets,
|
||||
const int size,
|
||||
TargetBitmapView res,
|
||||
TargetBitmapView valid_res,
|
||||
ValueType val1,
|
||||
ValueType val2) {
|
||||
if (lower_inclusive && upper_inclusive) {
|
||||
BinaryRangeElementFuncForJson<ValueType, true, true, filter_type>
|
||||
func;
|
||||
@ -859,20 +859,18 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForArray(EvalCtx& context) {
|
||||
}
|
||||
|
||||
size_t processed_cursor = 0;
|
||||
auto execute_sub_batch = [lower_inclusive,
|
||||
upper_inclusive,
|
||||
&processed_cursor,
|
||||
&bitmap_input]<FilterType filter_type =
|
||||
FilterType::sequential>(
|
||||
const milvus::ArrayView* data,
|
||||
const bool* valid_data,
|
||||
const int32_t* offsets,
|
||||
const int size,
|
||||
TargetBitmapView res,
|
||||
TargetBitmapView valid_res,
|
||||
ValueType val1,
|
||||
ValueType val2,
|
||||
int index) {
|
||||
auto execute_sub_batch =
|
||||
[ lower_inclusive, upper_inclusive, &processed_cursor, &
|
||||
bitmap_input ]<FilterType filter_type = FilterType::sequential>(
|
||||
const milvus::ArrayView* data,
|
||||
const bool* valid_data,
|
||||
const int32_t* offsets,
|
||||
const int size,
|
||||
TargetBitmapView res,
|
||||
TargetBitmapView valid_res,
|
||||
ValueType val1,
|
||||
ValueType val2,
|
||||
int index) {
|
||||
if (lower_inclusive && upper_inclusive) {
|
||||
BinaryRangeElementFuncForArray<ValueType, true, true, filter_type>
|
||||
func;
|
||||
|
||||
@ -231,8 +231,8 @@ PhyJsonContainsFilterExpr::ExecArrayContains(EvalCtx& context) {
|
||||
|
||||
int processed_cursor = 0;
|
||||
auto execute_sub_batch =
|
||||
[&processed_cursor,
|
||||
&bitmap_input]<FilterType filter_type = FilterType::sequential>(
|
||||
[&processed_cursor, &
|
||||
bitmap_input ]<FilterType filter_type = FilterType::sequential>(
|
||||
const milvus::ArrayView* data,
|
||||
const bool* valid_data,
|
||||
const int32_t* offsets,
|
||||
@ -240,32 +240,32 @@ PhyJsonContainsFilterExpr::ExecArrayContains(EvalCtx& context) {
|
||||
TargetBitmapView res,
|
||||
TargetBitmapView valid_res,
|
||||
const std::shared_ptr<MultiElement>& elements) {
|
||||
auto executor = [&](size_t i) {
|
||||
const auto& array = data[i];
|
||||
for (int j = 0; j < array.length(); ++j) {
|
||||
if (elements->In(array.template get_data<GetType>(j))) {
|
||||
return true;
|
||||
}
|
||||
auto executor = [&](size_t i) {
|
||||
const auto& array = data[i];
|
||||
for (int j = 0; j < array.length(); ++j) {
|
||||
if (elements->In(array.template get_data<GetType>(j))) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
};
|
||||
bool has_bitmap_input = !bitmap_input.empty();
|
||||
for (int i = 0; i < size; ++i) {
|
||||
auto offset = i;
|
||||
if constexpr (filter_type == FilterType::random) {
|
||||
offset = (offsets) ? offsets[i] : i;
|
||||
}
|
||||
if (valid_data != nullptr && !valid_data[offset]) {
|
||||
res[i] = valid_res[i] = false;
|
||||
continue;
|
||||
}
|
||||
if (has_bitmap_input && !bitmap_input[processed_cursor + i]) {
|
||||
continue;
|
||||
}
|
||||
res[i] = executor(offset);
|
||||
}
|
||||
processed_cursor += size;
|
||||
return false;
|
||||
};
|
||||
bool has_bitmap_input = !bitmap_input.empty();
|
||||
for (int i = 0; i < size; ++i) {
|
||||
auto offset = i;
|
||||
if constexpr (filter_type == FilterType::random) {
|
||||
offset = (offsets) ? offsets[i] : i;
|
||||
}
|
||||
if (valid_data != nullptr && !valid_data[offset]) {
|
||||
res[i] = valid_res[i] = false;
|
||||
continue;
|
||||
}
|
||||
if (has_bitmap_input && !bitmap_input[processed_cursor + i]) {
|
||||
continue;
|
||||
}
|
||||
res[i] = executor(offset);
|
||||
}
|
||||
processed_cursor += size;
|
||||
};
|
||||
|
||||
int64_t processed_size;
|
||||
if (has_offset_input_) {
|
||||
@ -323,8 +323,8 @@ PhyJsonContainsFilterExpr::ExecJsonContains(EvalCtx& context) {
|
||||
|
||||
size_t processed_cursor = 0;
|
||||
auto execute_sub_batch =
|
||||
[&processed_cursor,
|
||||
&bitmap_input]<FilterType filter_type = FilterType::sequential>(
|
||||
[&processed_cursor, &
|
||||
bitmap_input ]<FilterType filter_type = FilterType::sequential>(
|
||||
const milvus::Json* data,
|
||||
const bool* valid_data,
|
||||
const int32_t* offsets,
|
||||
@ -333,40 +333,40 @@ PhyJsonContainsFilterExpr::ExecJsonContains(EvalCtx& context) {
|
||||
TargetBitmapView valid_res,
|
||||
const std::string& pointer,
|
||||
const std::shared_ptr<MultiElement>& elements) {
|
||||
auto executor = [&](size_t i) {
|
||||
auto doc = data[i].doc();
|
||||
auto array = doc.at_pointer(pointer).get_array();
|
||||
if (array.error()) {
|
||||
return false;
|
||||
}
|
||||
for (auto&& it : array) {
|
||||
auto val = it.template get<GetType>();
|
||||
if (val.error()) {
|
||||
continue;
|
||||
}
|
||||
if (elements->In(val.value()) > 0) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
auto executor = [&](size_t i) {
|
||||
auto doc = data[i].doc();
|
||||
auto array = doc.at_pointer(pointer).get_array();
|
||||
if (array.error()) {
|
||||
return false;
|
||||
};
|
||||
bool has_bitmap_input = !bitmap_input.empty();
|
||||
for (size_t i = 0; i < size; ++i) {
|
||||
auto offset = i;
|
||||
if constexpr (filter_type == FilterType::random) {
|
||||
offset = (offsets) ? offsets[i] : i;
|
||||
}
|
||||
if (valid_data != nullptr && !valid_data[offset]) {
|
||||
res[i] = valid_res[i] = false;
|
||||
continue;
|
||||
}
|
||||
if (has_bitmap_input && !bitmap_input[processed_cursor + i]) {
|
||||
continue;
|
||||
}
|
||||
res[i] = executor(offset);
|
||||
}
|
||||
processed_cursor += size;
|
||||
for (auto&& it : array) {
|
||||
auto val = it.template get<GetType>();
|
||||
if (val.error()) {
|
||||
continue;
|
||||
}
|
||||
if (elements->In(val.value()) > 0) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
};
|
||||
bool has_bitmap_input = !bitmap_input.empty();
|
||||
for (size_t i = 0; i < size; ++i) {
|
||||
auto offset = i;
|
||||
if constexpr (filter_type == FilterType::random) {
|
||||
offset = (offsets) ? offsets[i] : i;
|
||||
}
|
||||
if (valid_data != nullptr && !valid_data[offset]) {
|
||||
res[i] = valid_res[i] = false;
|
||||
continue;
|
||||
}
|
||||
if (has_bitmap_input && !bitmap_input[processed_cursor + i]) {
|
||||
continue;
|
||||
}
|
||||
res[i] = executor(offset);
|
||||
}
|
||||
processed_cursor += size;
|
||||
};
|
||||
|
||||
int64_t processed_size;
|
||||
if (has_offset_input_) {
|
||||
@ -538,8 +538,8 @@ PhyJsonContainsFilterExpr::ExecJsonContainsArray(EvalCtx& context) {
|
||||
|
||||
size_t processed_cursor = 0;
|
||||
auto execute_sub_batch =
|
||||
[&processed_cursor,
|
||||
&bitmap_input]<FilterType filter_type = FilterType::sequential>(
|
||||
[&processed_cursor, &
|
||||
bitmap_input ]<FilterType filter_type = FilterType::sequential>(
|
||||
const milvus::Json* data,
|
||||
const bool* valid_data,
|
||||
const int32_t* offsets,
|
||||
@ -548,49 +548,49 @@ PhyJsonContainsFilterExpr::ExecJsonContainsArray(EvalCtx& context) {
|
||||
TargetBitmapView valid_res,
|
||||
const std::string& pointer,
|
||||
const std::vector<proto::plan::Array>& elements) {
|
||||
auto executor = [&](size_t i) -> bool {
|
||||
auto doc = data[i].doc();
|
||||
auto array = doc.at_pointer(pointer).get_array();
|
||||
if (array.error()) {
|
||||
return false;
|
||||
}
|
||||
for (auto&& it : array) {
|
||||
auto val = it.get_array();
|
||||
if (val.error()) {
|
||||
continue;
|
||||
}
|
||||
std::vector<
|
||||
simdjson::simdjson_result<simdjson::ondemand::value>>
|
||||
json_array;
|
||||
json_array.reserve(val.count_elements());
|
||||
for (auto&& e : val) {
|
||||
json_array.emplace_back(e);
|
||||
}
|
||||
for (auto const& element : elements) {
|
||||
if (CompareTwoJsonArray(json_array, element)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
auto executor = [&](size_t i) -> bool {
|
||||
auto doc = data[i].doc();
|
||||
auto array = doc.at_pointer(pointer).get_array();
|
||||
if (array.error()) {
|
||||
return false;
|
||||
};
|
||||
bool has_bitmap_input = !bitmap_input.empty();
|
||||
for (size_t i = 0; i < size; ++i) {
|
||||
auto offset = i;
|
||||
if constexpr (filter_type == FilterType::random) {
|
||||
offset = (offsets) ? offsets[i] : i;
|
||||
}
|
||||
if (valid_data != nullptr && !valid_data[offset]) {
|
||||
res[i] = valid_res[i] = false;
|
||||
continue;
|
||||
}
|
||||
if (has_bitmap_input && !bitmap_input[processed_cursor + i]) {
|
||||
continue;
|
||||
}
|
||||
res[i] = executor(offset);
|
||||
}
|
||||
processed_cursor += size;
|
||||
for (auto&& it : array) {
|
||||
auto val = it.get_array();
|
||||
if (val.error()) {
|
||||
continue;
|
||||
}
|
||||
std::vector<
|
||||
simdjson::simdjson_result<simdjson::ondemand::value>>
|
||||
json_array;
|
||||
json_array.reserve(val.count_elements());
|
||||
for (auto&& e : val) {
|
||||
json_array.emplace_back(e);
|
||||
}
|
||||
for (auto const& element : elements) {
|
||||
if (CompareTwoJsonArray(json_array, element)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
};
|
||||
bool has_bitmap_input = !bitmap_input.empty();
|
||||
for (size_t i = 0; i < size; ++i) {
|
||||
auto offset = i;
|
||||
if constexpr (filter_type == FilterType::random) {
|
||||
offset = (offsets) ? offsets[i] : i;
|
||||
}
|
||||
if (valid_data != nullptr && !valid_data[offset]) {
|
||||
res[i] = valid_res[i] = false;
|
||||
continue;
|
||||
}
|
||||
if (has_bitmap_input && !bitmap_input[processed_cursor + i]) {
|
||||
continue;
|
||||
}
|
||||
res[i] = executor(offset);
|
||||
}
|
||||
processed_cursor += size;
|
||||
};
|
||||
|
||||
int64_t processed_size;
|
||||
if (has_offset_input_) {
|
||||
@ -759,8 +759,8 @@ PhyJsonContainsFilterExpr::ExecArrayContainsAll(EvalCtx& context) {
|
||||
|
||||
int processed_cursor = 0;
|
||||
auto execute_sub_batch =
|
||||
[&processed_cursor,
|
||||
&bitmap_input]<FilterType filter_type = FilterType::sequential>(
|
||||
[&processed_cursor, &
|
||||
bitmap_input ]<FilterType filter_type = FilterType::sequential>(
|
||||
const milvus::ArrayView* data,
|
||||
const bool* valid_data,
|
||||
const int32_t* offsets,
|
||||
@ -768,34 +768,34 @@ PhyJsonContainsFilterExpr::ExecArrayContainsAll(EvalCtx& context) {
|
||||
TargetBitmapView res,
|
||||
TargetBitmapView valid_res,
|
||||
const std::set<GetType>& elements) {
|
||||
auto executor = [&](size_t i) {
|
||||
std::set<GetType> tmp_elements(elements);
|
||||
// Note: array can only be iterated once
|
||||
for (int j = 0; j < data[i].length(); ++j) {
|
||||
tmp_elements.erase(data[i].template get_data<GetType>(j));
|
||||
if (tmp_elements.size() == 0) {
|
||||
return true;
|
||||
}
|
||||
auto executor = [&](size_t i) {
|
||||
std::set<GetType> tmp_elements(elements);
|
||||
// Note: array can only be iterated once
|
||||
for (int j = 0; j < data[i].length(); ++j) {
|
||||
tmp_elements.erase(data[i].template get_data<GetType>(j));
|
||||
if (tmp_elements.size() == 0) {
|
||||
return true;
|
||||
}
|
||||
return tmp_elements.size() == 0;
|
||||
};
|
||||
bool has_bitmap_input = !bitmap_input.empty();
|
||||
for (int i = 0; i < size; ++i) {
|
||||
auto offset = i;
|
||||
if constexpr (filter_type == FilterType::random) {
|
||||
offset = (offsets) ? offsets[i] : i;
|
||||
}
|
||||
if (valid_data != nullptr && !valid_data[offset]) {
|
||||
res[i] = valid_res[i] = false;
|
||||
continue;
|
||||
}
|
||||
if (has_bitmap_input && !bitmap_input[processed_cursor + i]) {
|
||||
continue;
|
||||
}
|
||||
res[i] = executor(offset);
|
||||
}
|
||||
processed_cursor += size;
|
||||
return tmp_elements.size() == 0;
|
||||
};
|
||||
bool has_bitmap_input = !bitmap_input.empty();
|
||||
for (int i = 0; i < size; ++i) {
|
||||
auto offset = i;
|
||||
if constexpr (filter_type == FilterType::random) {
|
||||
offset = (offsets) ? offsets[i] : i;
|
||||
}
|
||||
if (valid_data != nullptr && !valid_data[offset]) {
|
||||
res[i] = valid_res[i] = false;
|
||||
continue;
|
||||
}
|
||||
if (has_bitmap_input && !bitmap_input[processed_cursor + i]) {
|
||||
continue;
|
||||
}
|
||||
res[i] = executor(offset);
|
||||
}
|
||||
processed_cursor += size;
|
||||
};
|
||||
int64_t processed_size;
|
||||
if (has_offset_input_) {
|
||||
processed_size =
|
||||
@ -851,8 +851,8 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAll(EvalCtx& context) {
|
||||
|
||||
int processed_cursor = 0;
|
||||
auto execute_sub_batch =
|
||||
[&processed_cursor,
|
||||
&bitmap_input]<FilterType filter_type = FilterType::sequential>(
|
||||
[&processed_cursor, &
|
||||
bitmap_input ]<FilterType filter_type = FilterType::sequential>(
|
||||
const milvus::Json* data,
|
||||
const bool* valid_data,
|
||||
const int32_t* offsets,
|
||||
@ -861,43 +861,43 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAll(EvalCtx& context) {
|
||||
TargetBitmapView valid_res,
|
||||
const std::string& pointer,
|
||||
const std::set<GetType>& elements) {
|
||||
auto executor = [&](const size_t i) -> bool {
|
||||
auto doc = data[i].doc();
|
||||
auto array = doc.at_pointer(pointer).get_array();
|
||||
if (array.error()) {
|
||||
return false;
|
||||
}
|
||||
std::set<GetType> tmp_elements(elements);
|
||||
// Note: array can only be iterated once
|
||||
for (auto&& it : array) {
|
||||
auto val = it.template get<GetType>();
|
||||
if (val.error()) {
|
||||
continue;
|
||||
}
|
||||
tmp_elements.erase(val.value());
|
||||
if (tmp_elements.size() == 0) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return tmp_elements.size() == 0;
|
||||
};
|
||||
bool has_bitmap_input = !bitmap_input.empty();
|
||||
for (size_t i = 0; i < size; ++i) {
|
||||
auto offset = i;
|
||||
if constexpr (filter_type == FilterType::random) {
|
||||
offset = (offsets) ? offsets[i] : i;
|
||||
}
|
||||
if (valid_data != nullptr && !valid_data[offset]) {
|
||||
res[i] = valid_res[i] = false;
|
||||
continue;
|
||||
}
|
||||
if (has_bitmap_input && !bitmap_input[processed_cursor + i]) {
|
||||
continue;
|
||||
}
|
||||
res[i] = executor(offset);
|
||||
auto executor = [&](const size_t i) -> bool {
|
||||
auto doc = data[i].doc();
|
||||
auto array = doc.at_pointer(pointer).get_array();
|
||||
if (array.error()) {
|
||||
return false;
|
||||
}
|
||||
processed_cursor += size;
|
||||
std::set<GetType> tmp_elements(elements);
|
||||
// Note: array can only be iterated once
|
||||
for (auto&& it : array) {
|
||||
auto val = it.template get<GetType>();
|
||||
if (val.error()) {
|
||||
continue;
|
||||
}
|
||||
tmp_elements.erase(val.value());
|
||||
if (tmp_elements.size() == 0) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return tmp_elements.size() == 0;
|
||||
};
|
||||
bool has_bitmap_input = !bitmap_input.empty();
|
||||
for (size_t i = 0; i < size; ++i) {
|
||||
auto offset = i;
|
||||
if constexpr (filter_type == FilterType::random) {
|
||||
offset = (offsets) ? offsets[i] : i;
|
||||
}
|
||||
if (valid_data != nullptr && !valid_data[offset]) {
|
||||
res[i] = valid_res[i] = false;
|
||||
continue;
|
||||
}
|
||||
if (has_bitmap_input && !bitmap_input[processed_cursor + i]) {
|
||||
continue;
|
||||
}
|
||||
res[i] = executor(offset);
|
||||
}
|
||||
processed_cursor += size;
|
||||
};
|
||||
|
||||
int64_t processed_size;
|
||||
if (has_offset_input_) {
|
||||
@ -1072,8 +1072,8 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllWithDiffType(EvalCtx& context) {
|
||||
|
||||
int processed_cursor = 0;
|
||||
auto execute_sub_batch =
|
||||
[&processed_cursor,
|
||||
&bitmap_input]<FilterType filter_type = FilterType::sequential>(
|
||||
[&processed_cursor, &
|
||||
bitmap_input ]<FilterType filter_type = FilterType::sequential>(
|
||||
const milvus::Json* data,
|
||||
const bool* valid_data,
|
||||
const int32_t* offsets,
|
||||
@ -1083,104 +1083,102 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllWithDiffType(EvalCtx& context) {
|
||||
const std::string& pointer,
|
||||
const std::vector<proto::plan::GenericValue>& elements,
|
||||
const std::unordered_set<int> elements_index) {
|
||||
auto executor = [&](size_t i) -> bool {
|
||||
const auto& json = data[i];
|
||||
auto doc = json.doc();
|
||||
auto array = doc.at_pointer(pointer).get_array();
|
||||
if (array.error()) {
|
||||
return false;
|
||||
}
|
||||
std::unordered_set<int> tmp_elements_index(elements_index);
|
||||
for (auto&& it : array) {
|
||||
int i = -1;
|
||||
for (auto& element : elements) {
|
||||
i++;
|
||||
switch (element.val_case()) {
|
||||
case proto::plan::GenericValue::kBoolVal: {
|
||||
auto val = it.template get<bool>();
|
||||
if (val.error()) {
|
||||
continue;
|
||||
}
|
||||
if (val.value() == element.bool_val()) {
|
||||
tmp_elements_index.erase(i);
|
||||
}
|
||||
break;
|
||||
auto executor = [&](size_t i) -> bool {
|
||||
const auto& json = data[i];
|
||||
auto doc = json.doc();
|
||||
auto array = doc.at_pointer(pointer).get_array();
|
||||
if (array.error()) {
|
||||
return false;
|
||||
}
|
||||
std::unordered_set<int> tmp_elements_index(elements_index);
|
||||
for (auto&& it : array) {
|
||||
int i = -1;
|
||||
for (auto& element : elements) {
|
||||
i++;
|
||||
switch (element.val_case()) {
|
||||
case proto::plan::GenericValue::kBoolVal: {
|
||||
auto val = it.template get<bool>();
|
||||
if (val.error()) {
|
||||
continue;
|
||||
}
|
||||
case proto::plan::GenericValue::kInt64Val: {
|
||||
auto val = it.template get<int64_t>();
|
||||
if (val.error()) {
|
||||
continue;
|
||||
}
|
||||
if (val.value() == element.int64_val()) {
|
||||
tmp_elements_index.erase(i);
|
||||
}
|
||||
break;
|
||||
if (val.value() == element.bool_val()) {
|
||||
tmp_elements_index.erase(i);
|
||||
}
|
||||
case proto::plan::GenericValue::kFloatVal: {
|
||||
auto val = it.template get<double>();
|
||||
if (val.error()) {
|
||||
continue;
|
||||
}
|
||||
if (val.value() == element.float_val()) {
|
||||
tmp_elements_index.erase(i);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case proto::plan::GenericValue::kStringVal: {
|
||||
auto val = it.template get<std::string_view>();
|
||||
if (val.error()) {
|
||||
continue;
|
||||
}
|
||||
if (val.value() == element.string_val()) {
|
||||
tmp_elements_index.erase(i);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case proto::plan::GenericValue::kArrayVal: {
|
||||
auto val = it.get_array();
|
||||
if (val.error()) {
|
||||
continue;
|
||||
}
|
||||
if (CompareTwoJsonArray(val,
|
||||
element.array_val())) {
|
||||
tmp_elements_index.erase(i);
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
ThrowInfo(
|
||||
DataTypeInvalid,
|
||||
fmt::format("unsupported data type {}",
|
||||
element.val_case()));
|
||||
break;
|
||||
}
|
||||
if (tmp_elements_index.size() == 0) {
|
||||
return true;
|
||||
case proto::plan::GenericValue::kInt64Val: {
|
||||
auto val = it.template get<int64_t>();
|
||||
if (val.error()) {
|
||||
continue;
|
||||
}
|
||||
if (val.value() == element.int64_val()) {
|
||||
tmp_elements_index.erase(i);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case proto::plan::GenericValue::kFloatVal: {
|
||||
auto val = it.template get<double>();
|
||||
if (val.error()) {
|
||||
continue;
|
||||
}
|
||||
if (val.value() == element.float_val()) {
|
||||
tmp_elements_index.erase(i);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case proto::plan::GenericValue::kStringVal: {
|
||||
auto val = it.template get<std::string_view>();
|
||||
if (val.error()) {
|
||||
continue;
|
||||
}
|
||||
if (val.value() == element.string_val()) {
|
||||
tmp_elements_index.erase(i);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case proto::plan::GenericValue::kArrayVal: {
|
||||
auto val = it.get_array();
|
||||
if (val.error()) {
|
||||
continue;
|
||||
}
|
||||
if (CompareTwoJsonArray(val, element.array_val())) {
|
||||
tmp_elements_index.erase(i);
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
ThrowInfo(DataTypeInvalid,
|
||||
fmt::format("unsupported data type {}",
|
||||
element.val_case()));
|
||||
}
|
||||
if (tmp_elements_index.size() == 0) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return tmp_elements_index.size() == 0;
|
||||
};
|
||||
bool has_bitmap_input = !bitmap_input.empty();
|
||||
for (size_t i = 0; i < size; ++i) {
|
||||
auto offset = i;
|
||||
if constexpr (filter_type == FilterType::random) {
|
||||
offset = (offsets) ? offsets[i] : i;
|
||||
if (tmp_elements_index.size() == 0) {
|
||||
return true;
|
||||
}
|
||||
if (valid_data != nullptr && !valid_data[offset]) {
|
||||
res[i] = valid_res[i] = false;
|
||||
continue;
|
||||
}
|
||||
if (has_bitmap_input && !bitmap_input[processed_cursor + i]) {
|
||||
continue;
|
||||
}
|
||||
|
||||
res[i] = executor(offset);
|
||||
}
|
||||
processed_cursor += size;
|
||||
return tmp_elements_index.size() == 0;
|
||||
};
|
||||
bool has_bitmap_input = !bitmap_input.empty();
|
||||
for (size_t i = 0; i < size; ++i) {
|
||||
auto offset = i;
|
||||
if constexpr (filter_type == FilterType::random) {
|
||||
offset = (offsets) ? offsets[i] : i;
|
||||
}
|
||||
if (valid_data != nullptr && !valid_data[offset]) {
|
||||
res[i] = valid_res[i] = false;
|
||||
continue;
|
||||
}
|
||||
if (has_bitmap_input && !bitmap_input[processed_cursor + i]) {
|
||||
continue;
|
||||
}
|
||||
|
||||
res[i] = executor(offset);
|
||||
}
|
||||
processed_cursor += size;
|
||||
};
|
||||
|
||||
int64_t processed_size;
|
||||
if (has_offset_input_) {
|
||||
@ -1413,8 +1411,8 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllArray(EvalCtx& context) {
|
||||
|
||||
size_t processed_cursor = 0;
|
||||
auto execute_sub_batch =
|
||||
[&processed_cursor,
|
||||
&bitmap_input]<FilterType filter_type = FilterType::sequential>(
|
||||
[&processed_cursor, &
|
||||
bitmap_input ]<FilterType filter_type = FilterType::sequential>(
|
||||
const milvus::Json* data,
|
||||
const bool* valid_data,
|
||||
const int32_t* offsets,
|
||||
@ -1423,54 +1421,54 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllArray(EvalCtx& context) {
|
||||
TargetBitmapView valid_res,
|
||||
const std::string& pointer,
|
||||
const std::vector<proto::plan::Array>& elements) {
|
||||
auto executor = [&](const size_t i) {
|
||||
auto doc = data[i].doc();
|
||||
auto array = doc.at_pointer(pointer).get_array();
|
||||
if (array.error()) {
|
||||
return false;
|
||||
}
|
||||
std::unordered_set<int> exist_elements_index;
|
||||
for (auto&& it : array) {
|
||||
auto val = it.get_array();
|
||||
if (val.error()) {
|
||||
continue;
|
||||
}
|
||||
std::vector<
|
||||
simdjson::simdjson_result<simdjson::ondemand::value>>
|
||||
json_array;
|
||||
json_array.reserve(val.count_elements());
|
||||
for (auto&& e : val) {
|
||||
json_array.emplace_back(e);
|
||||
}
|
||||
for (int index = 0; index < elements.size(); ++index) {
|
||||
if (CompareTwoJsonArray(json_array, elements[index])) {
|
||||
exist_elements_index.insert(index);
|
||||
}
|
||||
}
|
||||
if (exist_elements_index.size() == elements.size()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return exist_elements_index.size() == elements.size();
|
||||
};
|
||||
bool has_bitmap_input = !bitmap_input.empty();
|
||||
for (size_t i = 0; i < size; ++i) {
|
||||
auto offset = i;
|
||||
if constexpr (filter_type == FilterType::random) {
|
||||
offset = (offsets) ? offsets[i] : i;
|
||||
}
|
||||
if (valid_data != nullptr && !valid_data[offset]) {
|
||||
res[i] = valid_res[i] = false;
|
||||
continue;
|
||||
}
|
||||
if (has_bitmap_input && !bitmap_input[processed_cursor + i]) {
|
||||
continue;
|
||||
}
|
||||
|
||||
res[i] = executor(offset);
|
||||
auto executor = [&](const size_t i) {
|
||||
auto doc = data[i].doc();
|
||||
auto array = doc.at_pointer(pointer).get_array();
|
||||
if (array.error()) {
|
||||
return false;
|
||||
}
|
||||
processed_cursor += size;
|
||||
std::unordered_set<int> exist_elements_index;
|
||||
for (auto&& it : array) {
|
||||
auto val = it.get_array();
|
||||
if (val.error()) {
|
||||
continue;
|
||||
}
|
||||
std::vector<
|
||||
simdjson::simdjson_result<simdjson::ondemand::value>>
|
||||
json_array;
|
||||
json_array.reserve(val.count_elements());
|
||||
for (auto&& e : val) {
|
||||
json_array.emplace_back(e);
|
||||
}
|
||||
for (int index = 0; index < elements.size(); ++index) {
|
||||
if (CompareTwoJsonArray(json_array, elements[index])) {
|
||||
exist_elements_index.insert(index);
|
||||
}
|
||||
}
|
||||
if (exist_elements_index.size() == elements.size()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return exist_elements_index.size() == elements.size();
|
||||
};
|
||||
bool has_bitmap_input = !bitmap_input.empty();
|
||||
for (size_t i = 0; i < size; ++i) {
|
||||
auto offset = i;
|
||||
if constexpr (filter_type == FilterType::random) {
|
||||
offset = (offsets) ? offsets[i] : i;
|
||||
}
|
||||
if (valid_data != nullptr && !valid_data[offset]) {
|
||||
res[i] = valid_res[i] = false;
|
||||
continue;
|
||||
}
|
||||
if (has_bitmap_input && !bitmap_input[processed_cursor + i]) {
|
||||
continue;
|
||||
}
|
||||
|
||||
res[i] = executor(offset);
|
||||
}
|
||||
processed_cursor += size;
|
||||
};
|
||||
|
||||
int64_t processed_size;
|
||||
if (has_offset_input_) {
|
||||
@ -1645,8 +1643,8 @@ PhyJsonContainsFilterExpr::ExecJsonContainsWithDiffType(EvalCtx& context) {
|
||||
|
||||
size_t processed_cursor = 0;
|
||||
auto execute_sub_batch =
|
||||
[&processed_cursor,
|
||||
&bitmap_input]<FilterType filter_type = FilterType::sequential>(
|
||||
[&processed_cursor, &
|
||||
bitmap_input ]<FilterType filter_type = FilterType::sequential>(
|
||||
const milvus::Json* data,
|
||||
const bool* valid_data,
|
||||
const int32_t* offsets,
|
||||
@ -1655,96 +1653,94 @@ PhyJsonContainsFilterExpr::ExecJsonContainsWithDiffType(EvalCtx& context) {
|
||||
TargetBitmapView valid_res,
|
||||
const std::string& pointer,
|
||||
const std::vector<proto::plan::GenericValue>& elements) {
|
||||
auto executor = [&](const size_t i) {
|
||||
auto& json = data[i];
|
||||
auto doc = json.doc();
|
||||
auto array = doc.at_pointer(pointer).get_array();
|
||||
if (array.error()) {
|
||||
return false;
|
||||
}
|
||||
// Note: array can only be iterated once
|
||||
for (auto&& it : array) {
|
||||
for (auto const& element : elements) {
|
||||
switch (element.val_case()) {
|
||||
case proto::plan::GenericValue::kBoolVal: {
|
||||
auto val = it.template get<bool>();
|
||||
if (val.error()) {
|
||||
continue;
|
||||
}
|
||||
if (val.value() == element.bool_val()) {
|
||||
return true;
|
||||
}
|
||||
break;
|
||||
auto executor = [&](const size_t i) {
|
||||
auto& json = data[i];
|
||||
auto doc = json.doc();
|
||||
auto array = doc.at_pointer(pointer).get_array();
|
||||
if (array.error()) {
|
||||
return false;
|
||||
}
|
||||
// Note: array can only be iterated once
|
||||
for (auto&& it : array) {
|
||||
for (auto const& element : elements) {
|
||||
switch (element.val_case()) {
|
||||
case proto::plan::GenericValue::kBoolVal: {
|
||||
auto val = it.template get<bool>();
|
||||
if (val.error()) {
|
||||
continue;
|
||||
}
|
||||
case proto::plan::GenericValue::kInt64Val: {
|
||||
auto val = it.template get<int64_t>();
|
||||
if (val.error()) {
|
||||
continue;
|
||||
}
|
||||
if (val.value() == element.int64_val()) {
|
||||
return true;
|
||||
}
|
||||
break;
|
||||
if (val.value() == element.bool_val()) {
|
||||
return true;
|
||||
}
|
||||
case proto::plan::GenericValue::kFloatVal: {
|
||||
auto val = it.template get<double>();
|
||||
if (val.error()) {
|
||||
continue;
|
||||
}
|
||||
if (val.value() == element.float_val()) {
|
||||
return true;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case proto::plan::GenericValue::kStringVal: {
|
||||
auto val = it.template get<std::string_view>();
|
||||
if (val.error()) {
|
||||
continue;
|
||||
}
|
||||
if (val.value() == element.string_val()) {
|
||||
return true;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case proto::plan::GenericValue::kArrayVal: {
|
||||
auto val = it.get_array();
|
||||
if (val.error()) {
|
||||
continue;
|
||||
}
|
||||
if (CompareTwoJsonArray(val,
|
||||
element.array_val())) {
|
||||
return true;
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
ThrowInfo(
|
||||
DataTypeInvalid,
|
||||
fmt::format("unsupported data type {}",
|
||||
element.val_case()));
|
||||
break;
|
||||
}
|
||||
case proto::plan::GenericValue::kInt64Val: {
|
||||
auto val = it.template get<int64_t>();
|
||||
if (val.error()) {
|
||||
continue;
|
||||
}
|
||||
if (val.value() == element.int64_val()) {
|
||||
return true;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case proto::plan::GenericValue::kFloatVal: {
|
||||
auto val = it.template get<double>();
|
||||
if (val.error()) {
|
||||
continue;
|
||||
}
|
||||
if (val.value() == element.float_val()) {
|
||||
return true;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case proto::plan::GenericValue::kStringVal: {
|
||||
auto val = it.template get<std::string_view>();
|
||||
if (val.error()) {
|
||||
continue;
|
||||
}
|
||||
if (val.value() == element.string_val()) {
|
||||
return true;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case proto::plan::GenericValue::kArrayVal: {
|
||||
auto val = it.get_array();
|
||||
if (val.error()) {
|
||||
continue;
|
||||
}
|
||||
if (CompareTwoJsonArray(val, element.array_val())) {
|
||||
return true;
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
ThrowInfo(DataTypeInvalid,
|
||||
fmt::format("unsupported data type {}",
|
||||
element.val_case()));
|
||||
}
|
||||
}
|
||||
return false;
|
||||
};
|
||||
bool has_bitmap_input = !bitmap_input.empty();
|
||||
for (size_t i = 0; i < size; ++i) {
|
||||
auto offset = i;
|
||||
if constexpr (filter_type == FilterType::random) {
|
||||
offset = (offsets) ? offsets[i] : i;
|
||||
}
|
||||
if (valid_data != nullptr && !valid_data[offset]) {
|
||||
res[i] = valid_res[i] = false;
|
||||
continue;
|
||||
}
|
||||
if (has_bitmap_input && !bitmap_input[processed_cursor + i]) {
|
||||
continue;
|
||||
}
|
||||
|
||||
res[i] = executor(offset);
|
||||
}
|
||||
processed_cursor += size;
|
||||
return false;
|
||||
};
|
||||
bool has_bitmap_input = !bitmap_input.empty();
|
||||
for (size_t i = 0; i < size; ++i) {
|
||||
auto offset = i;
|
||||
if constexpr (filter_type == FilterType::random) {
|
||||
offset = (offsets) ? offsets[i] : i;
|
||||
}
|
||||
if (valid_data != nullptr && !valid_data[offset]) {
|
||||
res[i] = valid_res[i] = false;
|
||||
continue;
|
||||
}
|
||||
if (has_bitmap_input && !bitmap_input[processed_cursor + i]) {
|
||||
continue;
|
||||
}
|
||||
|
||||
res[i] = executor(offset);
|
||||
}
|
||||
processed_cursor += size;
|
||||
};
|
||||
|
||||
int64_t processed_size;
|
||||
if (has_offset_input_) {
|
||||
|
||||
@ -329,9 +329,8 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplArray(EvalCtx& context) {
|
||||
}
|
||||
int processed_cursor = 0;
|
||||
auto execute_sub_batch =
|
||||
[op_type,
|
||||
&processed_cursor,
|
||||
&bitmap_input]<FilterType filter_type = FilterType::sequential>(
|
||||
[ op_type, &processed_cursor, &
|
||||
bitmap_input ]<FilterType filter_type = FilterType::sequential>(
|
||||
const milvus::ArrayView* data,
|
||||
const bool* valid_data,
|
||||
const int32_t* offsets,
|
||||
@ -340,186 +339,185 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplArray(EvalCtx& context) {
|
||||
TargetBitmapView valid_res,
|
||||
ValueType val,
|
||||
int index) {
|
||||
switch (op_type) {
|
||||
case proto::plan::GreaterThan: {
|
||||
UnaryElementFuncForArray<ValueType,
|
||||
proto::plan::GreaterThan,
|
||||
filter_type>
|
||||
func;
|
||||
func(data,
|
||||
valid_data,
|
||||
size,
|
||||
val,
|
||||
index,
|
||||
res,
|
||||
valid_res,
|
||||
bitmap_input,
|
||||
processed_cursor,
|
||||
offsets);
|
||||
break;
|
||||
}
|
||||
case proto::plan::GreaterEqual: {
|
||||
UnaryElementFuncForArray<ValueType,
|
||||
proto::plan::GreaterEqual,
|
||||
filter_type>
|
||||
func;
|
||||
func(data,
|
||||
valid_data,
|
||||
size,
|
||||
val,
|
||||
index,
|
||||
res,
|
||||
valid_res,
|
||||
bitmap_input,
|
||||
processed_cursor,
|
||||
offsets);
|
||||
break;
|
||||
}
|
||||
case proto::plan::LessThan: {
|
||||
UnaryElementFuncForArray<ValueType,
|
||||
proto::plan::LessThan,
|
||||
filter_type>
|
||||
func;
|
||||
func(data,
|
||||
valid_data,
|
||||
size,
|
||||
val,
|
||||
index,
|
||||
res,
|
||||
valid_res,
|
||||
bitmap_input,
|
||||
processed_cursor,
|
||||
offsets);
|
||||
break;
|
||||
}
|
||||
case proto::plan::LessEqual: {
|
||||
UnaryElementFuncForArray<ValueType,
|
||||
proto::plan::LessEqual,
|
||||
filter_type>
|
||||
func;
|
||||
func(data,
|
||||
valid_data,
|
||||
size,
|
||||
val,
|
||||
index,
|
||||
res,
|
||||
valid_res,
|
||||
bitmap_input,
|
||||
processed_cursor,
|
||||
offsets);
|
||||
break;
|
||||
}
|
||||
case proto::plan::Equal: {
|
||||
UnaryElementFuncForArray<ValueType,
|
||||
proto::plan::Equal,
|
||||
filter_type>
|
||||
func;
|
||||
func(data,
|
||||
valid_data,
|
||||
size,
|
||||
val,
|
||||
index,
|
||||
res,
|
||||
valid_res,
|
||||
bitmap_input,
|
||||
processed_cursor,
|
||||
offsets);
|
||||
break;
|
||||
}
|
||||
case proto::plan::NotEqual: {
|
||||
UnaryElementFuncForArray<ValueType,
|
||||
proto::plan::NotEqual,
|
||||
filter_type>
|
||||
func;
|
||||
func(data,
|
||||
valid_data,
|
||||
size,
|
||||
val,
|
||||
index,
|
||||
res,
|
||||
valid_res,
|
||||
bitmap_input,
|
||||
processed_cursor,
|
||||
offsets);
|
||||
break;
|
||||
}
|
||||
case proto::plan::PrefixMatch: {
|
||||
UnaryElementFuncForArray<ValueType,
|
||||
proto::plan::PrefixMatch,
|
||||
filter_type>
|
||||
func;
|
||||
func(data,
|
||||
valid_data,
|
||||
size,
|
||||
val,
|
||||
index,
|
||||
res,
|
||||
valid_res,
|
||||
bitmap_input,
|
||||
processed_cursor,
|
||||
offsets);
|
||||
break;
|
||||
}
|
||||
case proto::plan::Match: {
|
||||
UnaryElementFuncForArray<ValueType,
|
||||
proto::plan::Match,
|
||||
filter_type>
|
||||
func;
|
||||
func(data,
|
||||
valid_data,
|
||||
size,
|
||||
val,
|
||||
index,
|
||||
res,
|
||||
valid_res,
|
||||
bitmap_input,
|
||||
processed_cursor,
|
||||
offsets);
|
||||
break;
|
||||
}
|
||||
case proto::plan::PostfixMatch: {
|
||||
UnaryElementFuncForArray<ValueType,
|
||||
proto::plan::PostfixMatch,
|
||||
filter_type>
|
||||
func;
|
||||
func(data,
|
||||
valid_data,
|
||||
size,
|
||||
val,
|
||||
index,
|
||||
res,
|
||||
valid_res,
|
||||
bitmap_input,
|
||||
processed_cursor,
|
||||
offsets);
|
||||
break;
|
||||
}
|
||||
case proto::plan::InnerMatch: {
|
||||
UnaryElementFuncForArray<ValueType,
|
||||
proto::plan::InnerMatch,
|
||||
filter_type>
|
||||
func;
|
||||
func(data,
|
||||
valid_data,
|
||||
size,
|
||||
val,
|
||||
index,
|
||||
res,
|
||||
valid_res,
|
||||
bitmap_input,
|
||||
processed_cursor,
|
||||
offsets);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
ThrowInfo(
|
||||
OpTypeInvalid,
|
||||
fmt::format(
|
||||
"unsupported operator type for unary expr: {}",
|
||||
op_type));
|
||||
switch (op_type) {
|
||||
case proto::plan::GreaterThan: {
|
||||
UnaryElementFuncForArray<ValueType,
|
||||
proto::plan::GreaterThan,
|
||||
filter_type>
|
||||
func;
|
||||
func(data,
|
||||
valid_data,
|
||||
size,
|
||||
val,
|
||||
index,
|
||||
res,
|
||||
valid_res,
|
||||
bitmap_input,
|
||||
processed_cursor,
|
||||
offsets);
|
||||
break;
|
||||
}
|
||||
processed_cursor += size;
|
||||
};
|
||||
case proto::plan::GreaterEqual: {
|
||||
UnaryElementFuncForArray<ValueType,
|
||||
proto::plan::GreaterEqual,
|
||||
filter_type>
|
||||
func;
|
||||
func(data,
|
||||
valid_data,
|
||||
size,
|
||||
val,
|
||||
index,
|
||||
res,
|
||||
valid_res,
|
||||
bitmap_input,
|
||||
processed_cursor,
|
||||
offsets);
|
||||
break;
|
||||
}
|
||||
case proto::plan::LessThan: {
|
||||
UnaryElementFuncForArray<ValueType,
|
||||
proto::plan::LessThan,
|
||||
filter_type>
|
||||
func;
|
||||
func(data,
|
||||
valid_data,
|
||||
size,
|
||||
val,
|
||||
index,
|
||||
res,
|
||||
valid_res,
|
||||
bitmap_input,
|
||||
processed_cursor,
|
||||
offsets);
|
||||
break;
|
||||
}
|
||||
case proto::plan::LessEqual: {
|
||||
UnaryElementFuncForArray<ValueType,
|
||||
proto::plan::LessEqual,
|
||||
filter_type>
|
||||
func;
|
||||
func(data,
|
||||
valid_data,
|
||||
size,
|
||||
val,
|
||||
index,
|
||||
res,
|
||||
valid_res,
|
||||
bitmap_input,
|
||||
processed_cursor,
|
||||
offsets);
|
||||
break;
|
||||
}
|
||||
case proto::plan::Equal: {
|
||||
UnaryElementFuncForArray<ValueType,
|
||||
proto::plan::Equal,
|
||||
filter_type>
|
||||
func;
|
||||
func(data,
|
||||
valid_data,
|
||||
size,
|
||||
val,
|
||||
index,
|
||||
res,
|
||||
valid_res,
|
||||
bitmap_input,
|
||||
processed_cursor,
|
||||
offsets);
|
||||
break;
|
||||
}
|
||||
case proto::plan::NotEqual: {
|
||||
UnaryElementFuncForArray<ValueType,
|
||||
proto::plan::NotEqual,
|
||||
filter_type>
|
||||
func;
|
||||
func(data,
|
||||
valid_data,
|
||||
size,
|
||||
val,
|
||||
index,
|
||||
res,
|
||||
valid_res,
|
||||
bitmap_input,
|
||||
processed_cursor,
|
||||
offsets);
|
||||
break;
|
||||
}
|
||||
case proto::plan::PrefixMatch: {
|
||||
UnaryElementFuncForArray<ValueType,
|
||||
proto::plan::PrefixMatch,
|
||||
filter_type>
|
||||
func;
|
||||
func(data,
|
||||
valid_data,
|
||||
size,
|
||||
val,
|
||||
index,
|
||||
res,
|
||||
valid_res,
|
||||
bitmap_input,
|
||||
processed_cursor,
|
||||
offsets);
|
||||
break;
|
||||
}
|
||||
case proto::plan::Match: {
|
||||
UnaryElementFuncForArray<ValueType,
|
||||
proto::plan::Match,
|
||||
filter_type>
|
||||
func;
|
||||
func(data,
|
||||
valid_data,
|
||||
size,
|
||||
val,
|
||||
index,
|
||||
res,
|
||||
valid_res,
|
||||
bitmap_input,
|
||||
processed_cursor,
|
||||
offsets);
|
||||
break;
|
||||
}
|
||||
case proto::plan::PostfixMatch: {
|
||||
UnaryElementFuncForArray<ValueType,
|
||||
proto::plan::PostfixMatch,
|
||||
filter_type>
|
||||
func;
|
||||
func(data,
|
||||
valid_data,
|
||||
size,
|
||||
val,
|
||||
index,
|
||||
res,
|
||||
valid_res,
|
||||
bitmap_input,
|
||||
processed_cursor,
|
||||
offsets);
|
||||
break;
|
||||
}
|
||||
case proto::plan::InnerMatch: {
|
||||
UnaryElementFuncForArray<ValueType,
|
||||
proto::plan::InnerMatch,
|
||||
filter_type>
|
||||
func;
|
||||
func(data,
|
||||
valid_data,
|
||||
size,
|
||||
val,
|
||||
index,
|
||||
res,
|
||||
valid_res,
|
||||
bitmap_input,
|
||||
processed_cursor,
|
||||
offsets);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
ThrowInfo(
|
||||
OpTypeInvalid,
|
||||
fmt::format("unsupported operator type for unary expr: {}",
|
||||
op_type));
|
||||
}
|
||||
processed_cursor += size;
|
||||
};
|
||||
int64_t processed_size;
|
||||
if (has_offset_input_) {
|
||||
processed_size =
|
||||
@ -719,18 +717,16 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJson(EvalCtx& context) {
|
||||
} while (false)
|
||||
|
||||
int processed_cursor = 0;
|
||||
auto execute_sub_batch = [op_type,
|
||||
pointer,
|
||||
&processed_cursor,
|
||||
&bitmap_input]<FilterType filter_type =
|
||||
FilterType::sequential>(
|
||||
const milvus::Json* data,
|
||||
const bool* valid_data,
|
||||
const int32_t* offsets,
|
||||
const int size,
|
||||
TargetBitmapView res,
|
||||
TargetBitmapView valid_res,
|
||||
ExprValueType val) {
|
||||
auto execute_sub_batch =
|
||||
[ op_type, pointer, &processed_cursor, &
|
||||
bitmap_input ]<FilterType filter_type = FilterType::sequential>(
|
||||
const milvus::Json* data,
|
||||
const bool* valid_data,
|
||||
const int32_t* offsets,
|
||||
const int size,
|
||||
TargetBitmapView res,
|
||||
TargetBitmapView valid_res,
|
||||
ExprValueType val) {
|
||||
bool has_bitmap_input = !bitmap_input.empty();
|
||||
switch (op_type) {
|
||||
case proto::plan::GreaterThan: {
|
||||
@ -1702,17 +1698,16 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplForData(EvalCtx& context) {
|
||||
auto expr_type = expr_->op_type_;
|
||||
|
||||
size_t processed_cursor = 0;
|
||||
auto execute_sub_batch = [expr_type,
|
||||
&processed_cursor,
|
||||
&bitmap_input]<FilterType filter_type =
|
||||
FilterType::sequential>(
|
||||
const T* data,
|
||||
const bool* valid_data,
|
||||
const int32_t* offsets,
|
||||
const int size,
|
||||
TargetBitmapView res,
|
||||
TargetBitmapView valid_res,
|
||||
IndexInnerType val) {
|
||||
auto execute_sub_batch =
|
||||
[ expr_type, &processed_cursor, &
|
||||
bitmap_input ]<FilterType filter_type = FilterType::sequential>(
|
||||
const T* data,
|
||||
const bool* valid_data,
|
||||
const int32_t* offsets,
|
||||
const int size,
|
||||
TargetBitmapView res,
|
||||
TargetBitmapView valid_res,
|
||||
IndexInnerType val) {
|
||||
switch (expr_type) {
|
||||
case proto::plan::GreaterThan: {
|
||||
UnaryElementFunc<T, proto::plan::GreaterThan, filter_type> func;
|
||||
@ -1968,7 +1963,8 @@ PhyUnaryRangeFilterExpr::ExecTextMatch() {
|
||||
|
||||
TargetBitmap result;
|
||||
TargetBitmap valid_result;
|
||||
result.append(*cached_match_res_, current_data_global_pos_, real_batch_size);
|
||||
result.append(
|
||||
*cached_match_res_, current_data_global_pos_, real_batch_size);
|
||||
valid_result.append(cached_index_chunk_valid_res_,
|
||||
current_data_global_pos_,
|
||||
real_batch_size);
|
||||
|
||||
@ -467,7 +467,8 @@ BitmapIndex<T>::MMapIndexData(const std::string& file_name,
|
||||
T key = ParseKey(&data_ptr);
|
||||
|
||||
roaring::Roaring value;
|
||||
value = roaring::Roaring::read(reinterpret_cast<const char*>(data_ptr));
|
||||
value =
|
||||
roaring::Roaring::read(reinterpret_cast<const char*>(data_ptr));
|
||||
for (const auto& v : value) {
|
||||
valid_bitset_.set(v);
|
||||
}
|
||||
|
||||
@ -107,7 +107,7 @@ HybridScalarIndex<T>::SelectBuildTypeForPrimitiveType(
|
||||
// Decide whether to select bitmap index or inverted sort
|
||||
if (distinct_vals.size() >= bitmap_index_cardinality_limit_) {
|
||||
if constexpr (std::is_integral_v<T>) {
|
||||
internal_index_type_ = ScalarIndexType::STLSORT;
|
||||
internal_index_type_ = ScalarIndexType::STLSORT;
|
||||
} else {
|
||||
internal_index_type_ = ScalarIndexType::INVERTED;
|
||||
}
|
||||
|
||||
@ -34,14 +34,23 @@
|
||||
|
||||
namespace milvus::index {
|
||||
|
||||
const std::string MMAP_PATH_FOR_TEST = "/tmp/milvus/mmap_test";
|
||||
|
||||
const std::string STLSORT_INDEX_FILE_NAME = "stlsort-index";
|
||||
|
||||
constexpr size_t ALIGNMENT = 32; // 32-byte alignment
|
||||
|
||||
template <typename T>
|
||||
ScalarIndexSort<T>::ScalarIndexSort(
|
||||
const storage::FileManagerContext& file_manager_context)
|
||||
: ScalarIndex<T>(ASCENDING_SORT), is_built_(false), data_() {
|
||||
// not valid means we are in unit test
|
||||
if (file_manager_context.Valid()) {
|
||||
field_id_ = file_manager_context.fieldDataMeta.field_id;
|
||||
file_manager_ =
|
||||
std::make_shared<storage::MemFileManagerImpl>(file_manager_context);
|
||||
AssertInfo(file_manager_ != nullptr, "create file manager failed!");
|
||||
disk_file_manager_ = std::make_shared<storage::DiskFileManagerImpl>(
|
||||
file_manager_context);
|
||||
}
|
||||
}
|
||||
|
||||
@ -65,6 +74,8 @@ ScalarIndexSort<T>::Build(size_t n, const T* values, const bool* valid_data) {
|
||||
if (n == 0) {
|
||||
ThrowInfo(DataIsEmpty, "ScalarIndexSort cannot build null values!");
|
||||
}
|
||||
index_build_begin_ = std::chrono::system_clock::now();
|
||||
|
||||
data_.reserve(n);
|
||||
total_num_rows_ = n;
|
||||
valid_bitset_ = TargetBitmap(total_num_rows_, false);
|
||||
@ -83,12 +94,16 @@ ScalarIndexSort<T>::Build(size_t n, const T* values, const bool* valid_data) {
|
||||
idx_to_offsets_[data_[i].idx_] = i;
|
||||
}
|
||||
is_built_ = true;
|
||||
|
||||
setup_data_pointers();
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
void
|
||||
ScalarIndexSort<T>::BuildWithFieldData(
|
||||
const std::vector<milvus::FieldDataPtr>& field_datas) {
|
||||
index_build_begin_ = std::chrono::system_clock::now();
|
||||
|
||||
int64_t length = 0;
|
||||
for (const auto& data : field_datas) {
|
||||
total_num_rows_ += data->get_num_rows();
|
||||
@ -122,6 +137,8 @@ ScalarIndexSort<T>::BuildWithFieldData(
|
||||
idx_to_offsets_[data_[i].idx_] = i;
|
||||
}
|
||||
is_built_ = true;
|
||||
|
||||
setup_data_pointers();
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
@ -153,6 +170,15 @@ ScalarIndexSort<T>::Serialize(const Config& config) {
|
||||
template <typename T>
|
||||
IndexStatsPtr
|
||||
ScalarIndexSort<T>::Upload(const Config& config) {
|
||||
auto index_build_duration =
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::system_clock::now() - index_build_begin_)
|
||||
.count();
|
||||
LOG_INFO(
|
||||
"index build done for ScalarIndexSort, field_id: {}, duration: {}ms",
|
||||
field_id_,
|
||||
index_build_duration);
|
||||
|
||||
auto binary_set = Serialize(config);
|
||||
file_manager_->AddFile(binary_set);
|
||||
|
||||
@ -169,8 +195,56 @@ ScalarIndexSort<T>::LoadWithoutAssemble(const BinarySet& index_binary,
|
||||
auto index_length = index_binary.GetByName("index_length");
|
||||
memcpy(&index_size, index_length->data.get(), (size_t)index_length->size);
|
||||
|
||||
is_mmap_ = GetValueFromConfig<bool>(config, ENABLE_MMAP).value_or(true);
|
||||
|
||||
auto index_data = index_binary.GetByName("index_data");
|
||||
data_.resize(index_size);
|
||||
|
||||
if (is_mmap_) {
|
||||
// some test may pass invalid file_manager_context in constructor which results in a nullptr disk_file_manager_
|
||||
mmap_filepath_ = disk_file_manager_ != nullptr
|
||||
? disk_file_manager_->GetLocalIndexObjectPrefix() +
|
||||
STLSORT_INDEX_FILE_NAME
|
||||
: MMAP_PATH_FOR_TEST;
|
||||
std::filesystem::create_directories(
|
||||
std::filesystem::path(mmap_filepath_).parent_path());
|
||||
|
||||
auto aligned_size =
|
||||
((index_data->size + ALIGNMENT - 1) / ALIGNMENT) * ALIGNMENT;
|
||||
{
|
||||
auto file_writer = storage::FileWriter(mmap_filepath_);
|
||||
file_writer.Write(index_data->data.get(), (size_t)index_data->size);
|
||||
|
||||
if (aligned_size > index_data->size) {
|
||||
std::vector<uint8_t> padding(aligned_size - index_data->size,
|
||||
0);
|
||||
file_writer.Write(padding.data(), padding.size());
|
||||
}
|
||||
file_writer.Finish();
|
||||
}
|
||||
|
||||
auto file = File::Open(mmap_filepath_, O_RDONLY);
|
||||
mmap_data_ = static_cast<char*>(mmap(
|
||||
NULL, aligned_size, PROT_READ, MAP_PRIVATE, file.Descriptor(), 0));
|
||||
|
||||
if (mmap_data_ == MAP_FAILED) {
|
||||
file.Close();
|
||||
remove(mmap_filepath_.c_str());
|
||||
ThrowInfo(ErrorCode::UnexpectedError,
|
||||
"failed to mmap: {}",
|
||||
strerror(errno));
|
||||
}
|
||||
|
||||
mmap_size_ = aligned_size;
|
||||
data_size_ = index_data->size;
|
||||
|
||||
file.Close();
|
||||
} else {
|
||||
data_.resize(index_size);
|
||||
memcpy(data_.data(), index_data->data.get(), (size_t)index_data->size);
|
||||
}
|
||||
|
||||
setup_data_pointers();
|
||||
|
||||
auto index_num_rows = index_binary.GetByName("index_num_rows");
|
||||
if (index_num_rows) {
|
||||
memcpy(&total_num_rows_,
|
||||
@ -182,13 +256,18 @@ ScalarIndexSort<T>::LoadWithoutAssemble(const BinarySet& index_binary,
|
||||
|
||||
idx_to_offsets_.resize(total_num_rows_);
|
||||
valid_bitset_ = TargetBitmap(total_num_rows_, false);
|
||||
memcpy(data_.data(), index_data->data.get(), (size_t)index_data->size);
|
||||
for (size_t i = 0; i < data_.size(); ++i) {
|
||||
idx_to_offsets_[data_[i].idx_] = i;
|
||||
valid_bitset_.set(data_[i].idx_);
|
||||
|
||||
for (size_t i = 0; i < Size(); ++i) {
|
||||
const auto& item = operator[](i);
|
||||
idx_to_offsets_[item.idx_] = i;
|
||||
valid_bitset_.set(item.idx_);
|
||||
}
|
||||
|
||||
is_built_ = true;
|
||||
|
||||
LOG_INFO("load ScalarIndexSort done, field_id: {}, is_mmap:{}",
|
||||
field_id_,
|
||||
is_mmap_);
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
@ -221,10 +300,10 @@ ScalarIndexSort<T>::In(const size_t n, const T* values) {
|
||||
AssertInfo(is_built_, "index has not been built");
|
||||
TargetBitmap bitset(Count());
|
||||
for (size_t i = 0; i < n; ++i) {
|
||||
auto lb = std::lower_bound(
|
||||
data_.begin(), data_.end(), IndexStructure<T>(*(values + i)));
|
||||
auto ub = std::upper_bound(
|
||||
data_.begin(), data_.end(), IndexStructure<T>(*(values + i)));
|
||||
auto lb =
|
||||
std::lower_bound(begin(), end(), IndexStructure<T>(*(values + i)));
|
||||
auto ub =
|
||||
std::upper_bound(begin(), end(), IndexStructure<T>(*(values + i)));
|
||||
for (; lb < ub; ++lb) {
|
||||
if (lb->a_ != *(values + i)) {
|
||||
std::cout << "error happens in ScalarIndexSort<T>::In, "
|
||||
@ -243,10 +322,10 @@ ScalarIndexSort<T>::NotIn(const size_t n, const T* values) {
|
||||
AssertInfo(is_built_, "index has not been built");
|
||||
TargetBitmap bitset(Count(), true);
|
||||
for (size_t i = 0; i < n; ++i) {
|
||||
auto lb = std::lower_bound(
|
||||
data_.begin(), data_.end(), IndexStructure<T>(*(values + i)));
|
||||
auto ub = std::upper_bound(
|
||||
data_.begin(), data_.end(), IndexStructure<T>(*(values + i)));
|
||||
auto lb =
|
||||
std::lower_bound(begin(), end(), IndexStructure<T>(*(values + i)));
|
||||
auto ub =
|
||||
std::upper_bound(begin(), end(), IndexStructure<T>(*(values + i)));
|
||||
for (; lb < ub; ++lb) {
|
||||
if (lb->a_ != *(values + i)) {
|
||||
std::cout << "error happens in ScalarIndexSort<T>::NotIn, "
|
||||
@ -285,27 +364,23 @@ const TargetBitmap
|
||||
ScalarIndexSort<T>::Range(const T value, const OpType op) {
|
||||
AssertInfo(is_built_, "index has not been built");
|
||||
TargetBitmap bitset(Count());
|
||||
auto lb = data_.begin();
|
||||
auto ub = data_.end();
|
||||
auto lb = begin();
|
||||
auto ub = end();
|
||||
if (ShouldSkip(value, value, op)) {
|
||||
return bitset;
|
||||
}
|
||||
switch (op) {
|
||||
case OpType::LessThan:
|
||||
ub = std::lower_bound(
|
||||
data_.begin(), data_.end(), IndexStructure<T>(value));
|
||||
ub = std::lower_bound(begin(), end(), IndexStructure<T>(value));
|
||||
break;
|
||||
case OpType::LessEqual:
|
||||
ub = std::upper_bound(
|
||||
data_.begin(), data_.end(), IndexStructure<T>(value));
|
||||
ub = std::upper_bound(begin(), end(), IndexStructure<T>(value));
|
||||
break;
|
||||
case OpType::GreaterThan:
|
||||
lb = std::upper_bound(
|
||||
data_.begin(), data_.end(), IndexStructure<T>(value));
|
||||
lb = std::upper_bound(begin(), end(), IndexStructure<T>(value));
|
||||
break;
|
||||
case OpType::GreaterEqual:
|
||||
lb = std::lower_bound(
|
||||
data_.begin(), data_.end(), IndexStructure<T>(value));
|
||||
lb = std::lower_bound(begin(), end(), IndexStructure<T>(value));
|
||||
break;
|
||||
default:
|
||||
ThrowInfo(OpTypeInvalid,
|
||||
@ -333,21 +408,21 @@ ScalarIndexSort<T>::Range(T lower_bound_value,
|
||||
if (ShouldSkip(lower_bound_value, upper_bound_value, OpType::Range)) {
|
||||
return bitset;
|
||||
}
|
||||
auto lb = data_.begin();
|
||||
auto ub = data_.end();
|
||||
auto lb = begin();
|
||||
auto ub = end();
|
||||
if (lb_inclusive) {
|
||||
lb = std::lower_bound(
|
||||
data_.begin(), data_.end(), IndexStructure<T>(lower_bound_value));
|
||||
begin(), end(), IndexStructure<T>(lower_bound_value));
|
||||
} else {
|
||||
lb = std::upper_bound(
|
||||
data_.begin(), data_.end(), IndexStructure<T>(lower_bound_value));
|
||||
begin(), end(), IndexStructure<T>(lower_bound_value));
|
||||
}
|
||||
if (ub_inclusive) {
|
||||
ub = std::upper_bound(
|
||||
data_.begin(), data_.end(), IndexStructure<T>(upper_bound_value));
|
||||
begin(), end(), IndexStructure<T>(upper_bound_value));
|
||||
} else {
|
||||
ub = std::lower_bound(
|
||||
data_.begin(), data_.end(), IndexStructure<T>(upper_bound_value));
|
||||
begin(), end(), IndexStructure<T>(upper_bound_value));
|
||||
}
|
||||
for (; lb < ub; ++lb) {
|
||||
bitset[lb->idx_] = true;
|
||||
@ -365,7 +440,7 @@ ScalarIndexSort<T>::Reverse_Lookup(size_t idx) const {
|
||||
return std::nullopt;
|
||||
}
|
||||
auto offset = idx_to_offsets_[idx];
|
||||
return data_[offset].a_;
|
||||
return operator[](offset).a_;
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
@ -373,9 +448,9 @@ bool
|
||||
ScalarIndexSort<T>::ShouldSkip(const T lower_value,
|
||||
const T upper_value,
|
||||
const milvus::OpType op) {
|
||||
if (!data_.empty()) {
|
||||
auto lower_bound = data_.begin();
|
||||
auto upper_bound = data_.rbegin();
|
||||
if (!Empty()) {
|
||||
auto lower_bound = begin();
|
||||
auto upper_bound = rbegin();
|
||||
bool shouldSkip = false;
|
||||
switch (op) {
|
||||
case OpType::LessThan: {
|
||||
|
||||
@ -26,6 +26,17 @@
|
||||
#include "index/IndexStructure.h"
|
||||
#include "index/ScalarIndex.h"
|
||||
#include "storage/MemFileManagerImpl.h"
|
||||
#include "storage/DiskFileManagerImpl.h"
|
||||
#include "storage/FileWriter.h"
|
||||
#include "common/File.h"
|
||||
|
||||
#if defined(__clang__) || defined(__GNUC__)
|
||||
#define ALWAYS_INLINE inline __attribute__((always_inline))
|
||||
#elif defined(_MSC_VER)
|
||||
#define ALWAYS_INLINE __forceinline
|
||||
#else
|
||||
#define ALWAYS_INLINE inline
|
||||
#endif
|
||||
|
||||
namespace milvus::index {
|
||||
|
||||
@ -39,6 +50,13 @@ class ScalarIndexSort : public ScalarIndex<T> {
|
||||
const storage::FileManagerContext& file_manager_context =
|
||||
storage::FileManagerContext());
|
||||
|
||||
~ScalarIndexSort() {
|
||||
if (is_mmap_ && mmap_data_ != nullptr && mmap_data_ != MAP_FAILED) {
|
||||
munmap(mmap_data_, mmap_size_);
|
||||
unlink(mmap_filepath_.c_str());
|
||||
}
|
||||
}
|
||||
|
||||
BinarySet
|
||||
Serialize(const Config& config) override;
|
||||
|
||||
@ -90,7 +108,12 @@ class ScalarIndexSort : public ScalarIndex<T> {
|
||||
|
||||
int64_t
|
||||
Size() override {
|
||||
return (int64_t)data_.size();
|
||||
return (int64_t)size_;
|
||||
}
|
||||
|
||||
bool
|
||||
Empty() const {
|
||||
return size_ == 0;
|
||||
}
|
||||
|
||||
IndexStatsPtr
|
||||
@ -109,9 +132,9 @@ class ScalarIndexSort : public ScalarIndex<T> {
|
||||
ShouldSkip(const T lower_value, const T upper_value, const OpType op);
|
||||
|
||||
public:
|
||||
const std::vector<IndexStructure<T>>&
|
||||
const IndexStructure<T>*
|
||||
GetData() {
|
||||
return data_;
|
||||
return data_ptr_;
|
||||
}
|
||||
|
||||
bool
|
||||
@ -123,15 +146,74 @@ class ScalarIndexSort : public ScalarIndex<T> {
|
||||
LoadWithoutAssemble(const BinarySet& binary_set,
|
||||
const Config& config) override;
|
||||
|
||||
public:
|
||||
// zero-cost data acess api
|
||||
ALWAYS_INLINE const IndexStructure<T>&
|
||||
operator[](size_t idx) const {
|
||||
assert(idx < size_);
|
||||
return data_ptr_[idx];
|
||||
}
|
||||
|
||||
ALWAYS_INLINE const IndexStructure<T>*
|
||||
begin() const {
|
||||
return data_ptr_;
|
||||
}
|
||||
|
||||
using const_iterator = const IndexStructure<T>*;
|
||||
using const_reverse_iterator = std::reverse_iterator<const_iterator>;
|
||||
|
||||
ALWAYS_INLINE const_reverse_iterator
|
||||
rbegin() const {
|
||||
return const_reverse_iterator(end());
|
||||
}
|
||||
|
||||
ALWAYS_INLINE const IndexStructure<T>*
|
||||
end() const {
|
||||
return end_ptr_;
|
||||
}
|
||||
|
||||
private:
|
||||
bool is_built_;
|
||||
void
|
||||
setup_data_pointers() const {
|
||||
if (is_mmap_) {
|
||||
data_ptr_ = reinterpret_cast<IndexStructure<T>*>(mmap_data_);
|
||||
size_ = data_size_ / sizeof(IndexStructure<T>);
|
||||
end_ptr_ = data_ptr_ + size_;
|
||||
} else {
|
||||
data_ptr_ = data_.data();
|
||||
end_ptr_ = data_ptr_ + data_.size();
|
||||
size_ = data_.size();
|
||||
}
|
||||
}
|
||||
|
||||
int64_t field_id_ = 0;
|
||||
|
||||
bool is_built_ = false;
|
||||
Config config_;
|
||||
std::vector<int32_t> idx_to_offsets_; // used to retrieve.
|
||||
std::vector<IndexStructure<T>> data_;
|
||||
std::shared_ptr<storage::MemFileManagerImpl> file_manager_;
|
||||
std::shared_ptr<storage::DiskFileManagerImpl> disk_file_manager_;
|
||||
size_t total_num_rows_{0};
|
||||
// generate valid_bitset_ to speed up NotIn and IsNull and IsNotNull operate
|
||||
TargetBitmap valid_bitset_;
|
||||
|
||||
// for ram and also used for building index.
|
||||
// Note: it should not be used directly for accessing data. Use data_ptr_ instead.
|
||||
std::vector<IndexStructure<T>> data_;
|
||||
|
||||
// for mmap
|
||||
bool is_mmap_{false};
|
||||
int64_t mmap_size_ = 0;
|
||||
int64_t data_size_ = 0;
|
||||
// Note: it should not be used directly for accessing data. Use data_ptr_ instead.
|
||||
char* mmap_data_ = nullptr;
|
||||
std::string mmap_filepath_;
|
||||
|
||||
mutable const IndexStructure<T>* data_ptr_ = nullptr;
|
||||
mutable const IndexStructure<T>* end_ptr_ = nullptr;
|
||||
mutable size_t size_ = 0;
|
||||
|
||||
std::chrono::time_point<std::chrono::system_clock> index_build_begin_;
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
|
||||
@ -57,9 +57,9 @@ class TextMatchIndex : public InvertedIndexTantivy<std::string> {
|
||||
|
||||
void
|
||||
AddTextsGrowing(size_t n,
|
||||
const std::string* texts,
|
||||
const bool* valids,
|
||||
int64_t offset_begin);
|
||||
const std::string* texts,
|
||||
const bool* valids,
|
||||
int64_t offset_begin);
|
||||
|
||||
void
|
||||
BuildIndexFromFieldData(const std::vector<FieldDataPtr>& field_datas,
|
||||
|
||||
@ -519,8 +519,10 @@ VectorMemIndex<T>::GetSparseVector(const DatasetPtr dataset) const {
|
||||
|
||||
template <typename T>
|
||||
void VectorMemIndex<T>::LoadFromFile(const Config& config) {
|
||||
auto local_filepath = GetValueFromConfig<std::string>(config, MMAP_FILE_PATH);
|
||||
AssertInfo(local_filepath.has_value(), "mmap filepath is empty when load index");
|
||||
auto local_filepath =
|
||||
GetValueFromConfig<std::string>(config, MMAP_FILE_PATH);
|
||||
AssertInfo(local_filepath.has_value(),
|
||||
"mmap filepath is empty when load index");
|
||||
|
||||
std::filesystem::create_directories(
|
||||
std::filesystem::path(local_filepath.value()).parent_path());
|
||||
@ -543,7 +545,8 @@ void VectorMemIndex<T>::LoadFromFile(const Config& config) {
|
||||
// try to read slice meta first
|
||||
std::string slice_meta_filepath;
|
||||
for (auto& idx_filepath : pending_index_files) {
|
||||
auto file_name = idx_filepath.substr(idx_filepath.find_last_of('/') + 1);
|
||||
auto file_name =
|
||||
idx_filepath.substr(idx_filepath.find_last_of('/') + 1);
|
||||
if (file_name == INDEX_FILE_SLICE_META) {
|
||||
slice_meta_filepath = idx_filepath;
|
||||
pending_index_files.erase(idx_filepath);
|
||||
@ -617,7 +620,8 @@ void VectorMemIndex<T>::LoadFromFile(const Config& config) {
|
||||
//2. write data into files
|
||||
auto start_write_file = std::chrono::system_clock::now();
|
||||
for (auto& [_, index_data] : result) {
|
||||
file_writer.Write(index_data->PayloadData(), index_data->PayloadSize());
|
||||
file_writer.Write(index_data->PayloadData(),
|
||||
index_data->PayloadSize());
|
||||
}
|
||||
write_disk_duration_sum +=
|
||||
(std::chrono::system_clock::now() - start_write_file);
|
||||
|
||||
@ -41,10 +41,10 @@
|
||||
|
||||
using namespace milvus;
|
||||
CStatus
|
||||
CreateIndexV0(enum CDataType dtype,
|
||||
const char* serialized_type_params,
|
||||
const char* serialized_index_params,
|
||||
CIndex* res_index) {
|
||||
CreateIndexForUT(enum CDataType dtype,
|
||||
const char* serialized_type_params,
|
||||
const char* serialized_index_params,
|
||||
CIndex* res_index) {
|
||||
SCOPE_CGO_CALL_METRIC();
|
||||
|
||||
auto status = CStatus();
|
||||
|
||||
@ -47,10 +47,10 @@ SerializeIndexAndUpLoad(CIndex index, ProtoLayoutInterface result);
|
||||
|
||||
// =========== Followings are used only in test ==========
|
||||
CStatus
|
||||
CreateIndexV0(enum CDataType dtype,
|
||||
const char* serialized_type_params,
|
||||
const char* serialized_index_params,
|
||||
CIndex* res_index);
|
||||
CreateIndexForUT(enum CDataType dtype,
|
||||
const char* serialized_type_params,
|
||||
const char* serialized_index_params,
|
||||
CIndex* res_index);
|
||||
|
||||
CStatus
|
||||
BuildFloatVecIndex(CIndex index, int64_t float_value_num, const float* vectors);
|
||||
|
||||
@ -84,8 +84,9 @@ class ChunkedColumnGroup {
|
||||
|
||||
int64_t
|
||||
GetNumRowsUntilChunk(int64_t chunk_id) const {
|
||||
AssertInfo(chunk_id >= 0 && chunk_id <= num_chunks_,
|
||||
"[StorageV2] chunk_id out of range: " + std::to_string(chunk_id));
|
||||
AssertInfo(
|
||||
chunk_id >= 0 && chunk_id <= num_chunks_,
|
||||
"[StorageV2] chunk_id out of range: " + std::to_string(chunk_id));
|
||||
return GetNumRowsUntilChunk()[chunk_id];
|
||||
}
|
||||
|
||||
@ -264,7 +265,8 @@ class ProxyChunkColumn : public ChunkedColumnInterface {
|
||||
std::nullopt) const override {
|
||||
if (!IsChunkedVariableColumnDataType(data_type_)) {
|
||||
ThrowInfo(ErrorCode::Unsupported,
|
||||
"[StorageV2] StringViews only supported for ChunkedVariableColumn");
|
||||
"[StorageV2] StringViews only supported for "
|
||||
"ChunkedVariableColumn");
|
||||
}
|
||||
auto chunk_wrapper = group_->GetGroupChunk(chunk_id);
|
||||
auto chunk = chunk_wrapper.get()->GetChunk(field_id_);
|
||||
@ -279,8 +281,9 @@ class ProxyChunkColumn : public ChunkedColumnInterface {
|
||||
std::optional<std::pair<int64_t, int64_t>> offset_len =
|
||||
std::nullopt) const override {
|
||||
if (!IsChunkedArrayColumnDataType(data_type_)) {
|
||||
ThrowInfo(ErrorCode::Unsupported,
|
||||
"[StorageV2] ArrayViews only supported for ChunkedArrayColumn");
|
||||
ThrowInfo(
|
||||
ErrorCode::Unsupported,
|
||||
"[StorageV2] ArrayViews only supported for ChunkedArrayColumn");
|
||||
}
|
||||
auto chunk_wrapper = group_->GetGroupChunk(chunk_id);
|
||||
auto chunk = chunk_wrapper.get()->GetChunk(field_id_);
|
||||
@ -292,9 +295,9 @@ class ProxyChunkColumn : public ChunkedColumnInterface {
|
||||
PinWrapper<std::vector<VectorArrayView>>
|
||||
VectorArrayViews(int64_t chunk_id) const override {
|
||||
if (!IsChunkedVectorArrayColumnDataType(data_type_)) {
|
||||
ThrowInfo(
|
||||
ErrorCode::Unsupported,
|
||||
"[StorageV2] VectorArrayViews only supported for ChunkedVectorArrayColumn");
|
||||
ThrowInfo(ErrorCode::Unsupported,
|
||||
"[StorageV2] VectorArrayViews only supported for "
|
||||
"ChunkedVectorArrayColumn");
|
||||
}
|
||||
auto chunk_wrapper = group_->GetGroupChunk(chunk_id);
|
||||
auto chunk = chunk_wrapper.get()->GetChunk(field_id_);
|
||||
@ -307,9 +310,9 @@ class ProxyChunkColumn : public ChunkedColumnInterface {
|
||||
ViewsByOffsets(int64_t chunk_id,
|
||||
const FixedVector<int32_t>& offsets) const override {
|
||||
if (!IsChunkedVariableColumnDataType(data_type_)) {
|
||||
ThrowInfo(
|
||||
ErrorCode::Unsupported,
|
||||
"[StorageV2] ViewsByOffsets only supported for ChunkedVariableColumn");
|
||||
ThrowInfo(ErrorCode::Unsupported,
|
||||
"[StorageV2] ViewsByOffsets only supported for "
|
||||
"ChunkedVariableColumn");
|
||||
}
|
||||
auto chunk_wrapper = group_->GetGroupChunk(chunk_id);
|
||||
auto chunk = chunk_wrapper.get()->GetChunk(field_id_);
|
||||
@ -409,11 +412,11 @@ class ProxyChunkColumn : public ChunkedColumnInterface {
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
ThrowInfo(
|
||||
ErrorCode::Unsupported,
|
||||
"[StorageV2] BulkScalarValueAt is not supported for unknown scalar "
|
||||
"data type: {}",
|
||||
data_type_);
|
||||
ThrowInfo(ErrorCode::Unsupported,
|
||||
"[StorageV2] BulkScalarValueAt is not supported for "
|
||||
"unknown scalar "
|
||||
"data type: {}",
|
||||
data_type_);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -441,7 +444,8 @@ class ProxyChunkColumn : public ChunkedColumnInterface {
|
||||
if (!IsChunkedVariableColumnDataType(data_type_) ||
|
||||
data_type_ == DataType::JSON) {
|
||||
ThrowInfo(ErrorCode::Unsupported,
|
||||
"[StorageV2] BulkRawStringAt only supported for ProxyChunkColumn of "
|
||||
"[StorageV2] BulkRawStringAt only supported for "
|
||||
"ProxyChunkColumn of "
|
||||
"variable length type(except Json)");
|
||||
}
|
||||
if (offsets == nullptr) {
|
||||
@ -479,9 +483,9 @@ class ProxyChunkColumn : public ChunkedColumnInterface {
|
||||
const int64_t* offsets,
|
||||
int64_t count) const override {
|
||||
if (data_type_ != DataType::JSON) {
|
||||
ThrowInfo(
|
||||
ErrorCode::Unsupported,
|
||||
"[StorageV2] RawJsonAt only supported for ProxyChunkColumn of Json type");
|
||||
ThrowInfo(ErrorCode::Unsupported,
|
||||
"[StorageV2] RawJsonAt only supported for "
|
||||
"ProxyChunkColumn of Json type");
|
||||
}
|
||||
if (count == 0) {
|
||||
return;
|
||||
@ -506,7 +510,8 @@ class ProxyChunkColumn : public ChunkedColumnInterface {
|
||||
int64_t count) const override {
|
||||
if (!IsChunkedArrayColumnDataType(data_type_)) {
|
||||
ThrowInfo(ErrorCode::Unsupported,
|
||||
"[StorageV2] BulkArrayAt only supported for ChunkedArrayColumn");
|
||||
"[StorageV2] BulkArrayAt only supported for "
|
||||
"ChunkedArrayColumn");
|
||||
}
|
||||
auto [cids, offsets_in_chunk] = ToChunkIdAndOffset(offsets, count);
|
||||
auto ca = group_->GetGroupChunks(cids);
|
||||
|
||||
@ -561,13 +561,13 @@ std::map<std::string, std::string> diskWriteModeDirectLabel = {
|
||||
{"mode", "direct"}};
|
||||
|
||||
DEFINE_PROMETHEUS_COUNTER_FAMILY(disk_write_total_bytes,
|
||||
"[cpp]disk write total bytes");
|
||||
"[cpp]disk write total bytes");
|
||||
DEFINE_PROMETHEUS_COUNTER(disk_write_total_bytes_buffered,
|
||||
disk_write_total_bytes,
|
||||
diskWriteModeBufferedLabel);
|
||||
disk_write_total_bytes,
|
||||
diskWriteModeBufferedLabel);
|
||||
DEFINE_PROMETHEUS_COUNTER(disk_write_total_bytes_direct,
|
||||
disk_write_total_bytes,
|
||||
diskWriteModeDirectLabel);
|
||||
disk_write_total_bytes,
|
||||
diskWriteModeDirectLabel);
|
||||
|
||||
// --- file writer metrics end ---
|
||||
|
||||
|
||||
@ -1,3 +1,14 @@
|
||||
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <chrono>
|
||||
|
||||
@ -201,7 +201,11 @@ ConfigureTieredStorage(const CacheWarmupPolicy scalarFieldCacheWarmupPolicy,
|
||||
disk_high_watermark_bytes,
|
||||
disk_max_bytes},
|
||||
evictionEnabled,
|
||||
{cache_touch_window_ms, eviction_interval_ms, overloaded_memory_threshold_percentage, max_disk_usage_percentage, loading_memory_factor});
|
||||
{cache_touch_window_ms,
|
||||
eviction_interval_ms,
|
||||
overloaded_memory_threshold_percentage,
|
||||
max_disk_usage_percentage,
|
||||
loading_memory_factor});
|
||||
}
|
||||
|
||||
} // namespace milvus::segcore
|
||||
|
||||
@ -108,6 +108,7 @@ set(MILVUS_TEST_FILES
|
||||
test_json_flat_index.cpp
|
||||
test_vector_array.cpp
|
||||
test_ngram_query.cpp
|
||||
test_stlsort_index.cpp
|
||||
)
|
||||
|
||||
if ( INDEX_ENGINE STREQUAL "cardinal" )
|
||||
|
||||
@ -157,7 +157,8 @@ TEST_F(FileWriterTest, WriteWithoutFinishWithDirectIO) {
|
||||
EXPECT_NE(content.size(), data.size());
|
||||
EXPECT_EQ(content.size(), kBufferSize * 2);
|
||||
EXPECT_NE(content, std::vector<char>(data.begin(), data.end()));
|
||||
EXPECT_EQ(content, std::vector<char>(data.begin(), data.begin() + kBufferSize * 2));
|
||||
EXPECT_EQ(content,
|
||||
std::vector<char>(data.begin(), data.begin() + kBufferSize * 2));
|
||||
}
|
||||
|
||||
// Test writing data with size slightly less than buffer size
|
||||
@ -416,12 +417,10 @@ TEST_F(FileWriterTest, LargeBufferSizeWriteWithDirectIO) {
|
||||
// Tese config FileWriterConfig with unknown mode
|
||||
TEST_F(FileWriterTest, UnknownModeWriteWithDirectIO) {
|
||||
uint8_t mode = 2;
|
||||
EXPECT_NO_THROW(
|
||||
{
|
||||
FileWriter::SetMode(
|
||||
static_cast<FileWriter::WriteMode>(mode));
|
||||
FileWriter::SetBufferSize(kBufferSize);
|
||||
});
|
||||
EXPECT_NO_THROW({
|
||||
FileWriter::SetMode(static_cast<FileWriter::WriteMode>(mode));
|
||||
FileWriter::SetBufferSize(kBufferSize);
|
||||
});
|
||||
}
|
||||
|
||||
TEST_F(FileWriterTest, HalfAlignedDataWriteWithDirectIO) {
|
||||
@ -840,9 +839,8 @@ TEST_F(FileWriterTest, ConcurrentAccessToFileWriterConfig) {
|
||||
threads.emplace_back([i]() {
|
||||
// Each thread sets different executor configurations
|
||||
FileWriteWorkerPool::GetInstance().Configure(i + 1);
|
||||
FileWriter::SetMode(
|
||||
i % 2 == 0 ? FileWriter::WriteMode::BUFFERED
|
||||
: FileWriter::WriteMode::DIRECT);
|
||||
FileWriter::SetMode(i % 2 == 0 ? FileWriter::WriteMode::BUFFERED
|
||||
: FileWriter::WriteMode::DIRECT);
|
||||
FileWriter::SetBufferSize(4096 * (i + 1));
|
||||
});
|
||||
}
|
||||
|
||||
@ -59,7 +59,7 @@ TestVecIndex() {
|
||||
CBinarySet binary_set;
|
||||
CIndex copy_index;
|
||||
|
||||
status = CreateIndexV0(
|
||||
status = CreateIndexForUT(
|
||||
dtype, type_params_str.c_str(), index_params_str.c_str(), &index);
|
||||
ASSERT_EQ(milvus::Success, status.error_code);
|
||||
|
||||
@ -94,7 +94,7 @@ TestVecIndex() {
|
||||
status = SerializeIndexToBinarySet(index, &binary_set);
|
||||
ASSERT_EQ(milvus::Success, status.error_code);
|
||||
|
||||
status = CreateIndexV0(
|
||||
status = CreateIndexForUT(
|
||||
dtype, type_params_str.c_str(), index_params_str.c_str(), ©_index);
|
||||
ASSERT_EQ(milvus::Success, status.error_code);
|
||||
|
||||
@ -142,10 +142,10 @@ TEST(CBoolIndexTest, All) {
|
||||
CIndex copy_index;
|
||||
|
||||
{
|
||||
status = CreateIndexV0(dtype,
|
||||
type_params_str.c_str(),
|
||||
index_params_str.c_str(),
|
||||
&index);
|
||||
status = CreateIndexForUT(dtype,
|
||||
type_params_str.c_str(),
|
||||
index_params_str.c_str(),
|
||||
&index);
|
||||
ASSERT_EQ(milvus::Success, status.error_code);
|
||||
}
|
||||
{
|
||||
@ -158,10 +158,10 @@ TEST(CBoolIndexTest, All) {
|
||||
ASSERT_EQ(milvus::Success, status.error_code);
|
||||
}
|
||||
{
|
||||
status = CreateIndexV0(dtype,
|
||||
type_params_str.c_str(),
|
||||
index_params_str.c_str(),
|
||||
©_index);
|
||||
status = CreateIndexForUT(dtype,
|
||||
type_params_str.c_str(),
|
||||
index_params_str.c_str(),
|
||||
©_index);
|
||||
ASSERT_EQ(milvus::Success, status.error_code);
|
||||
}
|
||||
{
|
||||
@ -200,10 +200,10 @@ TEST(CInt64IndexTest, All) {
|
||||
CIndex copy_index;
|
||||
|
||||
{
|
||||
status = CreateIndexV0(dtype,
|
||||
type_params_str.c_str(),
|
||||
index_params_str.c_str(),
|
||||
&index);
|
||||
status = CreateIndexForUT(dtype,
|
||||
type_params_str.c_str(),
|
||||
index_params_str.c_str(),
|
||||
&index);
|
||||
ASSERT_EQ(milvus::Success, status.error_code);
|
||||
}
|
||||
{
|
||||
@ -215,10 +215,10 @@ TEST(CInt64IndexTest, All) {
|
||||
ASSERT_EQ(milvus::Success, status.error_code);
|
||||
}
|
||||
{
|
||||
status = CreateIndexV0(dtype,
|
||||
type_params_str.c_str(),
|
||||
index_params_str.c_str(),
|
||||
©_index);
|
||||
status = CreateIndexForUT(dtype,
|
||||
type_params_str.c_str(),
|
||||
index_params_str.c_str(),
|
||||
©_index);
|
||||
ASSERT_EQ(milvus::Success, status.error_code);
|
||||
}
|
||||
{
|
||||
@ -259,10 +259,10 @@ TEST(CStringIndexTest, All) {
|
||||
CIndex copy_index;
|
||||
|
||||
{
|
||||
status = CreateIndexV0(dtype,
|
||||
type_params_str.c_str(),
|
||||
index_params_str.c_str(),
|
||||
&index);
|
||||
status = CreateIndexForUT(dtype,
|
||||
type_params_str.c_str(),
|
||||
index_params_str.c_str(),
|
||||
&index);
|
||||
ASSERT_EQ(milvus::Success, status.error_code);
|
||||
}
|
||||
{
|
||||
@ -275,10 +275,10 @@ TEST(CStringIndexTest, All) {
|
||||
ASSERT_EQ(milvus::Success, status.error_code);
|
||||
}
|
||||
{
|
||||
status = CreateIndexV0(dtype,
|
||||
type_params_str.c_str(),
|
||||
index_params_str.c_str(),
|
||||
©_index);
|
||||
status = CreateIndexForUT(dtype,
|
||||
type_params_str.c_str(),
|
||||
index_params_str.c_str(),
|
||||
©_index);
|
||||
ASSERT_EQ(milvus::Success, status.error_code);
|
||||
}
|
||||
{
|
||||
|
||||
@ -69,13 +69,13 @@ test_run() {
|
||||
int64_t index_version = 4000;
|
||||
int64_t lack_binlog_row = 100;
|
||||
|
||||
auto field_meta = gen_field_meta(collection_id,
|
||||
partition_id,
|
||||
segment_id,
|
||||
field_id,
|
||||
dtype,
|
||||
element_type,
|
||||
nullable);
|
||||
auto field_meta = milvus::segcore::gen_field_meta(collection_id,
|
||||
partition_id,
|
||||
segment_id,
|
||||
field_id,
|
||||
dtype,
|
||||
element_type,
|
||||
nullable);
|
||||
auto index_meta =
|
||||
gen_index_meta(segment_id, field_id, index_build_id, index_version);
|
||||
|
||||
@ -480,13 +480,13 @@ test_string() {
|
||||
int64_t index_version = 4001;
|
||||
int64_t lack_binlog_row = 100;
|
||||
|
||||
auto field_meta = gen_field_meta(collection_id,
|
||||
partition_id,
|
||||
segment_id,
|
||||
field_id,
|
||||
dtype,
|
||||
DataType::NONE,
|
||||
nullable);
|
||||
auto field_meta = milvus::segcore::gen_field_meta(collection_id,
|
||||
partition_id,
|
||||
segment_id,
|
||||
field_id,
|
||||
dtype,
|
||||
DataType::NONE,
|
||||
nullable);
|
||||
auto index_meta =
|
||||
gen_index_meta(segment_id, field_id, index_build_id, index_version);
|
||||
|
||||
|
||||
@ -41,50 +41,6 @@
|
||||
using namespace milvus;
|
||||
|
||||
namespace milvus::test {
|
||||
auto
|
||||
generate_field_meta(int64_t collection_id = 1,
|
||||
int64_t partition_id = 2,
|
||||
int64_t segment_id = 3,
|
||||
int64_t field_id = 101,
|
||||
DataType data_type = DataType::NONE,
|
||||
DataType element_type = DataType::NONE,
|
||||
bool nullable = false) -> storage::FieldDataMeta {
|
||||
auto meta = storage::FieldDataMeta{
|
||||
.collection_id = collection_id,
|
||||
.partition_id = partition_id,
|
||||
.segment_id = segment_id,
|
||||
.field_id = field_id,
|
||||
};
|
||||
meta.field_schema.set_data_type(
|
||||
static_cast<proto::schema::DataType>(data_type));
|
||||
meta.field_schema.set_element_type(
|
||||
static_cast<proto::schema::DataType>(element_type));
|
||||
meta.field_schema.set_nullable(nullable);
|
||||
return meta;
|
||||
}
|
||||
|
||||
auto
|
||||
generate_index_meta(int64_t segment_id = 3,
|
||||
int64_t field_id = 101,
|
||||
int64_t index_build_id = 1000,
|
||||
int64_t index_version = 10000) -> storage::IndexMeta {
|
||||
return storage::IndexMeta{
|
||||
.segment_id = segment_id,
|
||||
.field_id = field_id,
|
||||
.build_id = index_build_id,
|
||||
.index_version = index_version,
|
||||
};
|
||||
}
|
||||
|
||||
auto
|
||||
generate_local_storage_config(const std::string& root_path)
|
||||
-> storage::StorageConfig {
|
||||
auto ret = storage::StorageConfig{};
|
||||
ret.storage_type = "local";
|
||||
ret.root_path = root_path;
|
||||
return ret;
|
||||
}
|
||||
|
||||
struct ChunkManagerWrapper {
|
||||
ChunkManagerWrapper(storage::ChunkManagerPtr cm) : cm_(cm) {
|
||||
}
|
||||
@ -118,13 +74,13 @@ class JsonFlatIndexTest : public ::testing::Test {
|
||||
int64_t index_build_id = 4000;
|
||||
int64_t index_version = 4000;
|
||||
|
||||
field_meta_ = test::generate_field_meta(
|
||||
field_meta_ = milvus::segcore::gen_field_meta(
|
||||
collection_id, partition_id, segment_id, field_id, DataType::JSON);
|
||||
index_meta_ = test::generate_index_meta(
|
||||
segment_id, field_id, index_build_id, index_version);
|
||||
index_meta_ =
|
||||
gen_index_meta(segment_id, field_id, index_build_id, index_version);
|
||||
|
||||
std::string root_path = "/tmp/test-json-flat-index/";
|
||||
auto storage_config = test::generate_local_storage_config(root_path);
|
||||
auto storage_config = gen_local_storage_config(root_path);
|
||||
cm_ = storage::CreateChunkManager(storage_config);
|
||||
|
||||
json_data_ = {
|
||||
|
||||
@ -11,7 +11,6 @@
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
#include <string>
|
||||
#include <random>
|
||||
|
||||
#include "common/Schema.h"
|
||||
#include "test_utils/GenExprProto.h"
|
||||
@ -30,50 +29,6 @@ using namespace milvus::query;
|
||||
using namespace milvus::segcore;
|
||||
using namespace milvus::exec;
|
||||
|
||||
auto
|
||||
generate_field_meta(int64_t collection_id = 1,
|
||||
int64_t partition_id = 2,
|
||||
int64_t segment_id = 3,
|
||||
int64_t field_id = 101,
|
||||
DataType data_type = DataType::NONE,
|
||||
DataType element_type = DataType::NONE,
|
||||
bool nullable = false) -> storage::FieldDataMeta {
|
||||
auto meta = storage::FieldDataMeta{
|
||||
.collection_id = collection_id,
|
||||
.partition_id = partition_id,
|
||||
.segment_id = segment_id,
|
||||
.field_id = field_id,
|
||||
};
|
||||
meta.field_schema.set_data_type(
|
||||
static_cast<proto::schema::DataType>(data_type));
|
||||
meta.field_schema.set_element_type(
|
||||
static_cast<proto::schema::DataType>(element_type));
|
||||
meta.field_schema.set_nullable(nullable);
|
||||
return meta;
|
||||
}
|
||||
|
||||
auto
|
||||
generate_index_meta(int64_t segment_id = 3,
|
||||
int64_t field_id = 101,
|
||||
int64_t index_build_id = 1000,
|
||||
int64_t index_version = 10000) -> storage::IndexMeta {
|
||||
return storage::IndexMeta{
|
||||
.segment_id = segment_id,
|
||||
.field_id = field_id,
|
||||
.build_id = index_build_id,
|
||||
.index_version = index_version,
|
||||
};
|
||||
}
|
||||
|
||||
auto
|
||||
generate_local_storage_config(const std::string& root_path)
|
||||
-> storage::StorageConfig {
|
||||
auto ret = storage::StorageConfig{};
|
||||
ret.storage_type = "local";
|
||||
ret.root_path = root_path;
|
||||
return ret;
|
||||
}
|
||||
|
||||
void
|
||||
test_ngram_with_data(const boost::container::vector<std::string>& data,
|
||||
const std::string& literal,
|
||||
@ -90,24 +45,20 @@ test_ngram_with_data(const boost::container::vector<std::string>& data,
|
||||
auto schema = std::make_shared<Schema>();
|
||||
auto field_id = schema->AddDebugField("ngram", DataType::VARCHAR);
|
||||
|
||||
auto field_meta = generate_field_meta(collection_id,
|
||||
partition_id,
|
||||
segment_id,
|
||||
field_id.get(),
|
||||
DataType::VARCHAR,
|
||||
DataType::NONE,
|
||||
false);
|
||||
auto index_meta = generate_index_meta(
|
||||
auto field_meta = gen_field_meta(collection_id,
|
||||
partition_id,
|
||||
segment_id,
|
||||
field_id.get(),
|
||||
DataType::VARCHAR,
|
||||
DataType::NONE,
|
||||
false);
|
||||
auto index_meta = gen_index_meta(
|
||||
segment_id, field_id.get(), index_build_id, index_version);
|
||||
|
||||
std::string root_path = "/tmp/test-inverted-index/";
|
||||
auto storage_config = generate_local_storage_config(root_path);
|
||||
auto storage_config = gen_local_storage_config(root_path);
|
||||
auto cm = CreateChunkManager(storage_config);
|
||||
|
||||
std::random_device rd;
|
||||
std::mt19937 gen(rd());
|
||||
std::uniform_int_distribution<> distrib(1, 100);
|
||||
|
||||
size_t nb = data.size();
|
||||
|
||||
auto field_data = storage::CreateFieldData(DataType::VARCHAR, false);
|
||||
@ -149,8 +100,8 @@ test_ngram_with_data(const boost::container::vector<std::string>& data,
|
||||
|
||||
{
|
||||
Config config;
|
||||
config["index_type"] = milvus::index::INVERTED_INDEX_TYPE;
|
||||
config["insert_files"] = std::vector<std::string>{log_path};
|
||||
config[milvus::index::INDEX_TYPE] = milvus::index::INVERTED_INDEX_TYPE;
|
||||
config[INSERT_FILES_KEY] = std::vector<std::string>{log_path};
|
||||
|
||||
auto ngram_params = index::NgramParams{
|
||||
.loading_index = false,
|
||||
@ -170,10 +121,6 @@ test_ngram_with_data(const boost::container::vector<std::string>& data,
|
||||
}
|
||||
|
||||
{
|
||||
index::CreateIndexInfo index_info{};
|
||||
index_info.index_type = milvus::index::INVERTED_INDEX_TYPE;
|
||||
index_info.field_type = DataType::VARCHAR;
|
||||
|
||||
Config config;
|
||||
config[milvus::index::INDEX_FILES] = index_files;
|
||||
config[milvus::LOAD_PRIORITY] =
|
||||
|
||||
89
internal/core/unittest/test_stlsort_index.cpp
Normal file
89
internal/core/unittest/test_stlsort_index.cpp
Normal file
@ -0,0 +1,89 @@
|
||||
#include <gtest/gtest.h>
|
||||
#include <cstdint>
|
||||
#include "index/ScalarIndexSort.h"
|
||||
#include "common/Types.h"
|
||||
using namespace milvus;
|
||||
using namespace milvus::index;
|
||||
|
||||
void
|
||||
test_stlsort_for_range(
|
||||
const std::vector<int64_t>& data,
|
||||
DataType data_type,
|
||||
bool enable_mmap,
|
||||
std::function<TargetBitmap(
|
||||
const std::shared_ptr<ScalarIndexSort<int64_t>>&)> exec_expr,
|
||||
const std::vector<bool>& expected_result) {
|
||||
size_t nb = data.size();
|
||||
BinarySet binary_set;
|
||||
{
|
||||
Config config;
|
||||
|
||||
auto index = std::make_shared<index::ScalarIndexSort<int64_t>>();
|
||||
index->Build(nb, data.data());
|
||||
|
||||
binary_set = index->Serialize(config);
|
||||
}
|
||||
{
|
||||
Config config;
|
||||
config[milvus::index::ENABLE_MMAP] = enable_mmap;
|
||||
|
||||
auto index = std::make_shared<index::ScalarIndexSort<int64_t>>();
|
||||
index->Load(binary_set, config);
|
||||
|
||||
auto cnt = index->Count();
|
||||
ASSERT_EQ(cnt, nb);
|
||||
auto bitset = exec_expr(index);
|
||||
for (size_t i = 0; i < nb; i++) {
|
||||
ASSERT_EQ(bitset[i], expected_result[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
TEST(StlSortIndexTest, TestRange) {
|
||||
std::vector<int64_t> data = {10, 2, 6, 5, 9, 3, 7, 8, 4, 1};
|
||||
{
|
||||
std::vector<bool> expected_result = {
|
||||
false, false, true, true, false, true, true, false, true, false};
|
||||
auto exec_expr =
|
||||
[](const std::shared_ptr<ScalarIndexSort<int64_t>>& index) {
|
||||
return index->Range(3, true, 7, true);
|
||||
};
|
||||
|
||||
test_stlsort_for_range(
|
||||
data, DataType::INT64, false, exec_expr, expected_result);
|
||||
|
||||
test_stlsort_for_range(
|
||||
data, DataType::INT64, true, exec_expr, expected_result);
|
||||
}
|
||||
|
||||
{
|
||||
std::vector<bool> expected_result(data.size(), false);
|
||||
auto exec_expr =
|
||||
[](const std::shared_ptr<ScalarIndexSort<int64_t>>& index) {
|
||||
return index->Range(10, false, 70, true);
|
||||
};
|
||||
|
||||
test_stlsort_for_range(
|
||||
data, DataType::INT64, false, exec_expr, expected_result);
|
||||
|
||||
test_stlsort_for_range(
|
||||
data, DataType::INT64, true, exec_expr, expected_result);
|
||||
}
|
||||
}
|
||||
|
||||
TEST(StlSortIndexTest, TestIn) {
|
||||
std::vector<int64_t> data = {10, 2, 6, 5, 9, 3, 7, 8, 4, 1};
|
||||
std::vector<bool> expected_result = {
|
||||
false, false, false, true, false, true, true, false, false, false};
|
||||
|
||||
std::vector<int64_t> values = {3, 5, 7};
|
||||
|
||||
auto exec_expr =
|
||||
[&values](const std::shared_ptr<ScalarIndexSort<int64_t>>& index) {
|
||||
return index->In(values.size(), values.data());
|
||||
};
|
||||
test_stlsort_for_range(
|
||||
data, DataType::INT64, false, exec_expr, expected_result);
|
||||
|
||||
test_stlsort_for_range(
|
||||
data, DataType::INT64, true, exec_expr, expected_result);
|
||||
}
|
||||
@ -138,26 +138,28 @@ TEST(Tracer, ParseHeaders) {
|
||||
// Test empty headers
|
||||
auto headers_map = parseHeaders("");
|
||||
ASSERT_TRUE(headers_map.empty());
|
||||
|
||||
|
||||
// Test simple JSON headers
|
||||
std::string json_headers = R"({"Authorization": "Bearer token123", "Content-Type": "application/json"})";
|
||||
std::string json_headers =
|
||||
R"({"Authorization": "Bearer token123", "Content-Type": "application/json"})";
|
||||
headers_map = parseHeaders(json_headers);
|
||||
ASSERT_EQ(headers_map.size(), 2);
|
||||
ASSERT_EQ(headers_map["Authorization"], "Bearer token123");
|
||||
ASSERT_EQ(headers_map["Content-Type"], "application/json");
|
||||
|
||||
|
||||
// Test JSON with whitespace
|
||||
std::string json_headers_with_spaces = R"({ "key1" : "value1" , "key2" : "value2" })";
|
||||
std::string json_headers_with_spaces =
|
||||
R"({ "key1" : "value1" , "key2" : "value2" })";
|
||||
headers_map = parseHeaders(json_headers_with_spaces);
|
||||
ASSERT_EQ(headers_map.size(), 2);
|
||||
ASSERT_EQ(headers_map["key1"], "value1");
|
||||
ASSERT_EQ(headers_map["key2"], "value2");
|
||||
|
||||
|
||||
// Test invalid JSON
|
||||
std::string invalid_json = "invalid json string";
|
||||
headers_map = parseHeaders(invalid_json);
|
||||
ASSERT_TRUE(headers_map.empty());
|
||||
|
||||
|
||||
// Test empty JSON object
|
||||
std::string empty_json = "{}";
|
||||
headers_map = parseHeaders(empty_json);
|
||||
@ -169,19 +171,20 @@ TEST(Tracer, OTLPHttpExporter) {
|
||||
config->exporter = "otlp";
|
||||
config->otlpMethod = "http";
|
||||
config->otlpEndpoint = "http://localhost:4318/v1/traces";
|
||||
config->otlpHeaders = R"({"Authorization": "Bearer test-token", "Content-Type": "application/json"})";
|
||||
config->otlpHeaders =
|
||||
R"({"Authorization": "Bearer test-token", "Content-Type": "application/json"})";
|
||||
config->nodeID = 1;
|
||||
|
||||
|
||||
initTelemetry(*config);
|
||||
auto span = StartSpan("test_otlp_http");
|
||||
ASSERT_TRUE(span->IsRecording());
|
||||
|
||||
|
||||
// Test with empty headers
|
||||
config->otlpHeaders = "";
|
||||
initTelemetry(*config);
|
||||
span = StartSpan("test_otlp_http_empty_headers");
|
||||
ASSERT_TRUE(span->IsRecording());
|
||||
|
||||
|
||||
// Test with invalid JSON headers
|
||||
config->otlpHeaders = "invalid json";
|
||||
initTelemetry(*config);
|
||||
@ -197,17 +200,17 @@ TEST(Tracer, OTLPGrpcExporter) {
|
||||
config->otlpHeaders = R"({"Authorization": "Bearer grpc-token"})";
|
||||
config->oltpSecure = false;
|
||||
config->nodeID = 1;
|
||||
|
||||
|
||||
initTelemetry(*config);
|
||||
auto span = StartSpan("test_otlp_grpc");
|
||||
ASSERT_TRUE(span->IsRecording());
|
||||
|
||||
|
||||
// Test with secure connection
|
||||
config->oltpSecure = true;
|
||||
initTelemetry(*config);
|
||||
span = StartSpan("test_otlp_grpc_secure");
|
||||
ASSERT_TRUE(span->IsRecording());
|
||||
|
||||
|
||||
// Test with empty headers
|
||||
config->otlpHeaders = "";
|
||||
config->oltpSecure = false;
|
||||
@ -224,7 +227,7 @@ TEST(Tracer, OTLPLegacyConfiguration) {
|
||||
config->otlpHeaders = R"({"legacy": "header"})";
|
||||
config->oltpSecure = false;
|
||||
config->nodeID = 1;
|
||||
|
||||
|
||||
initTelemetry(*config);
|
||||
auto span = StartSpan("test_otlp_legacy");
|
||||
ASSERT_TRUE(span->IsRecording());
|
||||
@ -236,7 +239,7 @@ TEST(Tracer, OTLPInvalidMethod) {
|
||||
config->otlpMethod = "invalid_method";
|
||||
config->otlpEndpoint = "localhost:4317";
|
||||
config->nodeID = 1;
|
||||
|
||||
|
||||
initTelemetry(*config);
|
||||
auto span = StartSpan("test_otlp_invalid");
|
||||
// Should fall back to noop provider when export creation fails
|
||||
@ -255,7 +258,7 @@ TEST(Tracer, OTLPComplexHeaders) {
|
||||
"Accept": "application/json"
|
||||
})";
|
||||
config->nodeID = 1;
|
||||
|
||||
|
||||
initTelemetry(*config);
|
||||
auto span = StartSpan("test_otlp_complex_headers");
|
||||
ASSERT_TRUE(span->IsRecording());
|
||||
@ -265,7 +268,7 @@ TEST(Tracer, OTLPEmptyExporter) {
|
||||
auto config = std::make_shared<TraceConfig>();
|
||||
config->exporter = ""; // empty exporter
|
||||
config->nodeID = 1;
|
||||
|
||||
|
||||
initTelemetry(*config);
|
||||
auto span = StartSpan("test_empty_exporter");
|
||||
// Should fall back to noop provider
|
||||
@ -276,7 +279,7 @@ TEST(Tracer, OTLPInvalidExporter) {
|
||||
auto config = std::make_shared<TraceConfig>();
|
||||
config->exporter = "invalid_exporter";
|
||||
config->nodeID = 1;
|
||||
|
||||
|
||||
initTelemetry(*config);
|
||||
auto span = StartSpan("test_invalid_exporter");
|
||||
// Should fall back to noop provider
|
||||
@ -285,22 +288,23 @@ TEST(Tracer, OTLPInvalidExporter) {
|
||||
|
||||
TEST(Tracer, OTLPHeadersParsingEdgeCases) {
|
||||
// Test with whitespace in JSON
|
||||
std::string json_with_spaces = R"({ "key1" : "value1" , "key2" : "value2" })";
|
||||
std::string json_with_spaces =
|
||||
R"({ "key1" : "value1" , "key2" : "value2" })";
|
||||
auto headers_map = parseHeaders(json_with_spaces);
|
||||
ASSERT_EQ(headers_map.size(), 2);
|
||||
ASSERT_EQ(headers_map["key1"], "value1");
|
||||
ASSERT_EQ(headers_map["key2"], "value2");
|
||||
|
||||
|
||||
// Test with nested JSON (should fail gracefully)
|
||||
std::string nested_json = R"({"key": {"nested": "value"}})";
|
||||
headers_map = parseHeaders(nested_json);
|
||||
ASSERT_TRUE(headers_map.empty());
|
||||
|
||||
|
||||
// Test with array JSON (should fail gracefully)
|
||||
std::string array_json = R"(["header1", "header2"])";
|
||||
headers_map = parseHeaders(array_json);
|
||||
ASSERT_TRUE(headers_map.empty());
|
||||
|
||||
|
||||
// Test with null JSON
|
||||
std::string null_json = "null";
|
||||
headers_map = parseHeaders(null_json);
|
||||
|
||||
@ -356,8 +356,7 @@ GenerateRandomSparseFloatVector(size_t rows,
|
||||
return tensor;
|
||||
}
|
||||
|
||||
inline SchemaPtr
|
||||
CreateTestSchema() {
|
||||
inline SchemaPtr CreateTestSchema() {
|
||||
auto schema = std::make_shared<milvus::Schema>();
|
||||
auto bool_field =
|
||||
schema->AddDebugField("bool", milvus::DataType::BOOL, true);
|
||||
|
||||
@ -81,7 +81,7 @@ func NewCgoIndex(dtype schemapb.DataType, typeParams, indexParams map[string]str
|
||||
|
||||
var indexPtr C.CIndex
|
||||
cintDType := uint32(dtype)
|
||||
status := C.CreateIndexV0(cintDType, typeParamsPointer, indexParamsPointer, &indexPtr)
|
||||
status := C.CreateIndexForUT(cintDType, typeParamsPointer, indexParamsPointer, &indexPtr)
|
||||
if err := HandleCStatus(&status, "failed to create index"); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user