mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
enhance: remove some meta cache for json shredding (#45888)
#42533 Signed-off-by: luzhang <luzhang@zilliz.com> Co-authored-by: luzhang <luzhang@zilliz.com>
This commit is contained in:
parent
5f5560d042
commit
d5bd17315c
@ -722,9 +722,7 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForJsonStats() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
if (!index->CanSkipShared(pointer)) {
|
index->ExecuteForSharedData(op_ctx_, pointer, shared_executor);
|
||||||
index->ExecuteForSharedData(op_ctx_, pointer, shared_executor);
|
|
||||||
}
|
|
||||||
cached_index_chunk_id_ = 0;
|
cached_index_chunk_id_ = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -216,16 +216,14 @@ PhyExistsFilterExpr::EvalJsonExistsForDataSegmentByStats() {
|
|||||||
res_view |= temp_valid_view;
|
res_view |= temp_valid_view;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!index->CanSkipShared(pointer)) {
|
// process shared data, need to check if the value is empty
|
||||||
// process shared data, need to check if the value is empty
|
// which match the semantics of exists in Json.h
|
||||||
// which match the semantics of exists in Json.h
|
index->ExecuteForSharedData(
|
||||||
index->ExecuteForSharedData(
|
op_ctx_,
|
||||||
op_ctx_,
|
pointer,
|
||||||
pointer,
|
[&](BsonView bson, uint32_t row_id, uint32_t offset) {
|
||||||
[&](BsonView bson, uint32_t row_id, uint32_t offset) {
|
res_view[row_id] = !bson.IsBsonValueEmpty(offset);
|
||||||
res_view[row_id] = !bson.IsBsonValueEmpty(offset);
|
});
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
TargetBitmap result;
|
TargetBitmap result;
|
||||||
|
|||||||
@ -508,9 +508,7 @@ PhyJsonContainsFilterExpr::ExecJsonContainsByStats() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
if (!index->CanSkipShared(pointer)) {
|
index->ExecuteForSharedData(op_ctx_, pointer, shared_executor);
|
||||||
index->ExecuteForSharedData(op_ctx_, pointer, shared_executor);
|
|
||||||
}
|
|
||||||
cached_index_chunk_id_ = 0;
|
cached_index_chunk_id_ = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -709,9 +707,7 @@ PhyJsonContainsFilterExpr::ExecJsonContainsArrayByStats() {
|
|||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
};
|
};
|
||||||
if (!index->CanSkipShared(pointer)) {
|
index->ExecuteForSharedData(op_ctx_, pointer, shared_executor);
|
||||||
index->ExecuteForSharedData(op_ctx_, pointer, shared_executor);
|
|
||||||
}
|
|
||||||
cached_index_chunk_id_ = 0;
|
cached_index_chunk_id_ = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1046,9 +1042,7 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllByStats() {
|
|||||||
}
|
}
|
||||||
res_view[row_offset] = tmp_elements.empty();
|
res_view[row_offset] = tmp_elements.empty();
|
||||||
};
|
};
|
||||||
if (!index->CanSkipShared(pointer)) {
|
index->ExecuteForSharedData(op_ctx_, pointer, shared_executor);
|
||||||
index->ExecuteForSharedData(op_ctx_, pointer, shared_executor);
|
|
||||||
}
|
|
||||||
cached_index_chunk_id_ = 0;
|
cached_index_chunk_id_ = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1377,9 +1371,7 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllWithDiffTypeByStats() {
|
|||||||
}
|
}
|
||||||
res_view[row_offset] = tmp_elements_index.size() == 0;
|
res_view[row_offset] = tmp_elements_index.size() == 0;
|
||||||
};
|
};
|
||||||
if (!index->CanSkipShared(pointer)) {
|
index->ExecuteForSharedData(op_ctx_, pointer, shared_executor);
|
||||||
index->ExecuteForSharedData(op_ctx_, pointer, shared_executor);
|
|
||||||
}
|
|
||||||
cached_index_chunk_id_ = 0;
|
cached_index_chunk_id_ = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1584,9 +1576,7 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllArrayByStats() {
|
|||||||
res_view[row_offset] =
|
res_view[row_offset] =
|
||||||
exist_elements_index.size() == elements.size();
|
exist_elements_index.size() == elements.size();
|
||||||
};
|
};
|
||||||
if (!index->CanSkipShared(pointer)) {
|
index->ExecuteForSharedData(op_ctx_, pointer, shared_executor);
|
||||||
index->ExecuteForSharedData(op_ctx_, pointer, shared_executor);
|
|
||||||
}
|
|
||||||
cached_index_chunk_id_ = 0;
|
cached_index_chunk_id_ = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1890,9 +1880,7 @@ PhyJsonContainsFilterExpr::ExecJsonContainsWithDiffTypeByStats() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
if (!index->CanSkipShared(pointer)) {
|
index->ExecuteForSharedData(op_ctx_, pointer, shared_executor);
|
||||||
index->ExecuteForSharedData(op_ctx_, pointer, shared_executor);
|
|
||||||
}
|
|
||||||
cached_index_chunk_id_ = 0;
|
cached_index_chunk_id_ = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -687,9 +687,7 @@ PhyTermFilterExpr::ExecJsonInVariableByStats() {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
if (!index->CanSkipShared(pointer)) {
|
index->ExecuteForSharedData(op_ctx_, pointer, shared_executor);
|
||||||
index->ExecuteForSharedData(op_ctx_, pointer, shared_executor);
|
|
||||||
}
|
|
||||||
cached_index_chunk_id_ = 0;
|
cached_index_chunk_id_ = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -1265,9 +1265,7 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJsonByStats() {
|
|||||||
ms);
|
ms);
|
||||||
});
|
});
|
||||||
|
|
||||||
if (!index->CanSkipShared(pointer, target_types)) {
|
index->ExecuteForSharedData(op_ctx_, pointer, shared_executor);
|
||||||
index->ExecuteForSharedData(op_ctx_, pointer, shared_executor);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// for NotEqual: flip the result
|
// for NotEqual: flip the result
|
||||||
|
|||||||
@ -809,8 +809,12 @@ JsonKeyStats::GetCommonMetaFromParquet(const std::string& file) {
|
|||||||
try {
|
try {
|
||||||
auto layout_type_json = nlohmann::json::parse(value);
|
auto layout_type_json = nlohmann::json::parse(value);
|
||||||
for (const auto& [k, v] : layout_type_json.items()) {
|
for (const auto& [k, v] : layout_type_json.items()) {
|
||||||
field_layout_type_map_[k] = JsonKeyLayoutTypeFromString(v);
|
auto layout_type = JsonKeyLayoutTypeFromString(v);
|
||||||
key_data_type_map_[k] = GetJsonTypeFromKeyName(k);
|
// Only store metadata for shredding columns (TYPED/DYNAMIC),
|
||||||
|
// skip SHARED keys to save memory
|
||||||
|
if (layout_type == JsonKeyLayoutType::SHARED) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
key_field_map_[GetKeyFromColumnName(k)].insert(k);
|
key_field_map_[GetKeyFromColumnName(k)].insert(k);
|
||||||
}
|
}
|
||||||
} catch (const std::exception& e) {
|
} catch (const std::exception& e) {
|
||||||
|
|||||||
@ -168,6 +168,9 @@ class JsonKeyStats : public ScalarIndex<std::string> {
|
|||||||
const std::string& path,
|
const std::string& path,
|
||||||
std::function<void(BsonView bson, uint32_t row_id, uint32_t offset)>
|
std::function<void(BsonView bson, uint32_t row_id, uint32_t offset)>
|
||||||
func) {
|
func) {
|
||||||
|
if (shared_column_ == nullptr || bson_inverted_index_ == nullptr) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
bson_inverted_index_->TermQuery(
|
bson_inverted_index_->TermQuery(
|
||||||
path,
|
path,
|
||||||
[this, &func, op_ctx](const uint32_t* row_id_array,
|
[this, &func, op_ctx](const uint32_t* row_id_array,
|
||||||
@ -181,6 +184,9 @@ class JsonKeyStats : public ScalarIndex<std::string> {
|
|||||||
void
|
void
|
||||||
ExecuteExistsPathForSharedData(const std::string& path,
|
ExecuteExistsPathForSharedData(const std::string& path,
|
||||||
TargetBitmapView bitset) {
|
TargetBitmapView bitset) {
|
||||||
|
if (bson_inverted_index_ == nullptr) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
bson_inverted_index_->TermQueryEach(
|
bson_inverted_index_->TermQueryEach(
|
||||||
path, [&bitset](uint32_t row_id, uint32_t offset) {
|
path, [&bitset](uint32_t row_id, uint32_t offset) {
|
||||||
bitset[row_id] = true;
|
bitset[row_id] = true;
|
||||||
@ -302,53 +308,6 @@ class JsonKeyStats : public ScalarIndex<std::string> {
|
|||||||
return processed_size;
|
return processed_size;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Whether shared columns can be skipped for this path (type-agnostic)
|
|
||||||
bool
|
|
||||||
CanSkipShared(const std::string& path) {
|
|
||||||
auto it = key_field_map_.find(path);
|
|
||||||
if (it == key_field_map_.end()) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
const auto& field_names = it->second;
|
|
||||||
for (const auto& field_name : field_names) {
|
|
||||||
if (field_layout_type_map_[field_name] ==
|
|
||||||
JsonKeyLayoutType::SHARED) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Whether shared columns can be skipped for this path with type filter
|
|
||||||
bool
|
|
||||||
CanSkipShared(const std::string& path,
|
|
||||||
const std::set<milvus::index::JSONType>& target_types) {
|
|
||||||
auto it = key_field_map_.find(path);
|
|
||||||
if (it == key_field_map_.end()) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
const auto& field_names = it->second;
|
|
||||||
for (const auto& field_name : field_names) {
|
|
||||||
if (field_layout_type_map_[field_name] !=
|
|
||||||
JsonKeyLayoutType::SHARED) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!target_types.empty() &&
|
|
||||||
target_types.find(key_data_type_map_[field_name]) ==
|
|
||||||
target_types.end()) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
std::set<std::string>
|
std::set<std::string>
|
||||||
GetShreddingFields(const std::string& pointer) {
|
GetShreddingFields(const std::string& pointer) {
|
||||||
std::set<std::string> fields;
|
std::set<std::string> fields;
|
||||||
@ -679,14 +638,10 @@ class JsonKeyStats : public ScalarIndex<std::string> {
|
|||||||
|
|
||||||
milvus::proto::common::LoadPriority load_priority_;
|
milvus::proto::common::LoadPriority load_priority_;
|
||||||
// some meta cache for searching
|
// some meta cache for searching
|
||||||
// json_path -> [json_path_int, json_path_array, json_path_object, ...], only for all keys
|
// json_path -> [json_path_int, json_path_string, ...], only for shredding columns
|
||||||
std::unordered_map<std::string, std::set<std::string>> key_field_map_;
|
std::unordered_map<std::string, std::set<std::string>> key_field_map_;
|
||||||
// field_name -> data_type, such as json_path_int -> JSONType::INT64, only for real shredding columns
|
// field_name -> data_type, such as json_path_int -> JSONType::INT64, only for real shredding columns
|
||||||
std::unordered_map<std::string, JSONType> shred_field_data_type_map_;
|
std::unordered_map<std::string, JSONType> shred_field_data_type_map_;
|
||||||
// key_name -> data_type, such as json_path_int -> JSONType::INT64, for all keys
|
|
||||||
std::unordered_map<std::string, JSONType> key_data_type_map_;
|
|
||||||
// field_name -> key_type, such as json_path_int -> JsonKeyLayoutType::TYPED, for all keys
|
|
||||||
std::unordered_map<std::string, JsonKeyLayoutType> field_layout_type_map_;
|
|
||||||
// field_name -> field_id, such as json_path_int -> 1001
|
// field_name -> field_id, such as json_path_int -> 1001
|
||||||
std::unordered_map<std::string, int64_t> field_name_to_id_map_;
|
std::unordered_map<std::string, int64_t> field_name_to_id_map_;
|
||||||
// field_id -> field_name, such as 1001 -> json_path_int
|
// field_id -> field_name, such as 1001 -> json_path_int
|
||||||
|
|||||||
@ -267,10 +267,7 @@ TEST_P(JsonKeyStatsTest, TestExecutorForGettingValid) {
|
|||||||
index_->ExecutorForGettingValid(nullptr, field, valid_res_view);
|
index_->ExecutorForGettingValid(nullptr, field, valid_res_view);
|
||||||
EXPECT_EQ(processed_size, size_);
|
EXPECT_EQ(processed_size, size_);
|
||||||
}
|
}
|
||||||
if (!index_->CanSkipShared(path)) {
|
index_->ExecuteExistsPathForSharedData(path, valid_res_view);
|
||||||
std::cout << "can not skip shared" << std::endl;
|
|
||||||
index_->ExecuteExistsPathForSharedData(path, valid_res_view);
|
|
||||||
}
|
|
||||||
std::cout << "valid_res.count(): " << valid_res.count() << std::endl;
|
std::cout << "valid_res.count(): " << valid_res.count() << std::endl;
|
||||||
if (nullable_) {
|
if (nullable_) {
|
||||||
EXPECT_EQ(valid_res.count(), 400);
|
EXPECT_EQ(valid_res.count(), 400);
|
||||||
@ -323,15 +320,6 @@ TEST_P(JsonKeyStatsTest, TestGetShreddingFields) {
|
|||||||
EXPECT_FALSE(typed_fields.empty());
|
EXPECT_FALSE(typed_fields.empty());
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_P(JsonKeyStatsTest, TestCanSkipShared) {
|
|
||||||
std::string path = "/int";
|
|
||||||
std::set<JSONType> target_types = {JSONType::INT64};
|
|
||||||
EXPECT_TRUE(index_->CanSkipShared(path, target_types));
|
|
||||||
|
|
||||||
target_types = {JSONType::STRING};
|
|
||||||
EXPECT_TRUE(index_->CanSkipShared(path, target_types));
|
|
||||||
}
|
|
||||||
|
|
||||||
class JsonKeyStatsUploadLoadTest : public ::testing::Test {
|
class JsonKeyStatsUploadLoadTest : public ::testing::Test {
|
||||||
protected:
|
protected:
|
||||||
void
|
void
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user