mirror of
https://gitee.com/milvus-io/milvus.git
synced 2025-12-06 17:18:35 +08:00
enhance: remove some meta cache for json shredding (#45889)
pr: #45888 Signed-off-by: luzhang <luzhang@zilliz.com> Co-authored-by: luzhang <luzhang@zilliz.com>
This commit is contained in:
parent
8ee392a682
commit
731178a97f
@ -717,9 +717,7 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForJsonStats() {
|
||||
}
|
||||
}
|
||||
};
|
||||
if (!index->CanSkipShared(pointer)) {
|
||||
index->ExecuteForSharedData(op_ctx_, pointer, shared_executor);
|
||||
}
|
||||
index->ExecuteForSharedData(op_ctx_, pointer, shared_executor);
|
||||
cached_index_chunk_id_ = 0;
|
||||
}
|
||||
|
||||
|
||||
@ -211,16 +211,14 @@ PhyExistsFilterExpr::EvalJsonExistsForDataSegmentByStats() {
|
||||
res_view |= temp_valid_view;
|
||||
}
|
||||
|
||||
if (!index->CanSkipShared(pointer)) {
|
||||
// process shared data, need to check if the value is empty
|
||||
// which match the semantics of exists in Json.h
|
||||
index->ExecuteForSharedData(
|
||||
op_ctx_,
|
||||
pointer,
|
||||
[&](BsonView bson, uint32_t row_id, uint32_t offset) {
|
||||
res_view[row_id] = !bson.IsBsonValueEmpty(offset);
|
||||
});
|
||||
}
|
||||
// process shared data, need to check if the value is empty
|
||||
// which match the semantics of exists in Json.h
|
||||
index->ExecuteForSharedData(
|
||||
op_ctx_,
|
||||
pointer,
|
||||
[&](BsonView bson, uint32_t row_id, uint32_t offset) {
|
||||
res_view[row_id] = !bson.IsBsonValueEmpty(offset);
|
||||
});
|
||||
}
|
||||
|
||||
TargetBitmap result;
|
||||
|
||||
@ -503,9 +503,7 @@ PhyJsonContainsFilterExpr::ExecJsonContainsByStats() {
|
||||
}
|
||||
}
|
||||
};
|
||||
if (!index->CanSkipShared(pointer)) {
|
||||
index->ExecuteForSharedData(op_ctx_, pointer, shared_executor);
|
||||
}
|
||||
index->ExecuteForSharedData(op_ctx_, pointer, shared_executor);
|
||||
cached_index_chunk_id_ = 0;
|
||||
}
|
||||
|
||||
@ -704,9 +702,7 @@ PhyJsonContainsFilterExpr::ExecJsonContainsArrayByStats() {
|
||||
}
|
||||
return false;
|
||||
};
|
||||
if (!index->CanSkipShared(pointer)) {
|
||||
index->ExecuteForSharedData(op_ctx_, pointer, shared_executor);
|
||||
}
|
||||
index->ExecuteForSharedData(op_ctx_, pointer, shared_executor);
|
||||
cached_index_chunk_id_ = 0;
|
||||
}
|
||||
|
||||
@ -1041,9 +1037,7 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllByStats() {
|
||||
}
|
||||
res_view[row_offset] = tmp_elements.empty();
|
||||
};
|
||||
if (!index->CanSkipShared(pointer)) {
|
||||
index->ExecuteForSharedData(op_ctx_, pointer, shared_executor);
|
||||
}
|
||||
index->ExecuteForSharedData(op_ctx_, pointer, shared_executor);
|
||||
cached_index_chunk_id_ = 0;
|
||||
}
|
||||
|
||||
@ -1372,9 +1366,7 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllWithDiffTypeByStats() {
|
||||
}
|
||||
res_view[row_offset] = tmp_elements_index.size() == 0;
|
||||
};
|
||||
if (!index->CanSkipShared(pointer)) {
|
||||
index->ExecuteForSharedData(op_ctx_, pointer, shared_executor);
|
||||
}
|
||||
index->ExecuteForSharedData(op_ctx_, pointer, shared_executor);
|
||||
cached_index_chunk_id_ = 0;
|
||||
}
|
||||
|
||||
@ -1579,9 +1571,7 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllArrayByStats() {
|
||||
res_view[row_offset] =
|
||||
exist_elements_index.size() == elements.size();
|
||||
};
|
||||
if (!index->CanSkipShared(pointer)) {
|
||||
index->ExecuteForSharedData(op_ctx_, pointer, shared_executor);
|
||||
}
|
||||
index->ExecuteForSharedData(op_ctx_, pointer, shared_executor);
|
||||
cached_index_chunk_id_ = 0;
|
||||
}
|
||||
|
||||
@ -1885,9 +1875,7 @@ PhyJsonContainsFilterExpr::ExecJsonContainsWithDiffTypeByStats() {
|
||||
}
|
||||
}
|
||||
};
|
||||
if (!index->CanSkipShared(pointer)) {
|
||||
index->ExecuteForSharedData(op_ctx_, pointer, shared_executor);
|
||||
}
|
||||
index->ExecuteForSharedData(op_ctx_, pointer, shared_executor);
|
||||
cached_index_chunk_id_ = 0;
|
||||
}
|
||||
|
||||
|
||||
@ -682,9 +682,7 @@ PhyTermFilterExpr::ExecJsonInVariableByStats() {
|
||||
return;
|
||||
}
|
||||
};
|
||||
if (!index->CanSkipShared(pointer)) {
|
||||
index->ExecuteForSharedData(op_ctx_, pointer, shared_executor);
|
||||
}
|
||||
index->ExecuteForSharedData(op_ctx_, pointer, shared_executor);
|
||||
cached_index_chunk_id_ = 0;
|
||||
}
|
||||
|
||||
|
||||
@ -1260,9 +1260,7 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJsonByStats() {
|
||||
ms);
|
||||
});
|
||||
|
||||
if (!index->CanSkipShared(pointer, target_types)) {
|
||||
index->ExecuteForSharedData(op_ctx_, pointer, shared_executor);
|
||||
}
|
||||
index->ExecuteForSharedData(op_ctx_, pointer, shared_executor);
|
||||
}
|
||||
|
||||
// for NotEqual: flip the result
|
||||
|
||||
@ -809,8 +809,12 @@ JsonKeyStats::GetCommonMetaFromParquet(const std::string& file) {
|
||||
try {
|
||||
auto layout_type_json = nlohmann::json::parse(value);
|
||||
for (const auto& [k, v] : layout_type_json.items()) {
|
||||
field_layout_type_map_[k] = JsonKeyLayoutTypeFromString(v);
|
||||
key_data_type_map_[k] = GetJsonTypeFromKeyName(k);
|
||||
auto layout_type = JsonKeyLayoutTypeFromString(v);
|
||||
// Only store metadata for shredding columns (TYPED/DYNAMIC),
|
||||
// skip SHARED keys to save memory
|
||||
if (layout_type == JsonKeyLayoutType::SHARED) {
|
||||
continue;
|
||||
}
|
||||
key_field_map_[GetKeyFromColumnName(k)].insert(k);
|
||||
}
|
||||
} catch (const std::exception& e) {
|
||||
@ -1062,4 +1066,4 @@ JsonKeyStats::Upload(const Config& config) {
|
||||
std::move(index_files));
|
||||
}
|
||||
|
||||
} // namespace milvus::index
|
||||
} // namespace milvus::index
|
||||
|
||||
@ -168,6 +168,9 @@ class JsonKeyStats : public ScalarIndex<std::string> {
|
||||
const std::string& path,
|
||||
std::function<void(BsonView bson, uint32_t row_id, uint32_t offset)>
|
||||
func) {
|
||||
if (shared_column_ == nullptr || bson_inverted_index_ == nullptr) {
|
||||
return;
|
||||
}
|
||||
bson_inverted_index_->TermQuery(
|
||||
path,
|
||||
[this, &func, op_ctx](const uint32_t* row_id_array,
|
||||
@ -181,6 +184,9 @@ class JsonKeyStats : public ScalarIndex<std::string> {
|
||||
void
|
||||
ExecuteExistsPathForSharedData(const std::string& path,
|
||||
TargetBitmapView bitset) {
|
||||
if (bson_inverted_index_ == nullptr) {
|
||||
return;
|
||||
}
|
||||
bson_inverted_index_->TermQueryEach(
|
||||
path, [&bitset](uint32_t row_id, uint32_t offset) {
|
||||
bitset[row_id] = true;
|
||||
@ -302,53 +308,6 @@ class JsonKeyStats : public ScalarIndex<std::string> {
|
||||
return processed_size;
|
||||
}
|
||||
|
||||
// Whether shared columns can be skipped for this path (type-agnostic)
|
||||
bool
|
||||
CanSkipShared(const std::string& path) {
|
||||
auto it = key_field_map_.find(path);
|
||||
if (it == key_field_map_.end()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
const auto& field_names = it->second;
|
||||
for (const auto& field_name : field_names) {
|
||||
if (field_layout_type_map_[field_name] ==
|
||||
JsonKeyLayoutType::SHARED) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
// Whether shared columns can be skipped for this path with type filter
|
||||
bool
|
||||
CanSkipShared(const std::string& path,
|
||||
const std::set<milvus::index::JSONType>& target_types) {
|
||||
auto it = key_field_map_.find(path);
|
||||
if (it == key_field_map_.end()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
const auto& field_names = it->second;
|
||||
for (const auto& field_name : field_names) {
|
||||
if (field_layout_type_map_[field_name] !=
|
||||
JsonKeyLayoutType::SHARED) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!target_types.empty() &&
|
||||
target_types.find(key_data_type_map_[field_name]) ==
|
||||
target_types.end()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
std::set<std::string>
|
||||
GetShreddingFields(const std::string& pointer) {
|
||||
std::set<std::string> fields;
|
||||
@ -679,14 +638,10 @@ class JsonKeyStats : public ScalarIndex<std::string> {
|
||||
|
||||
milvus::proto::common::LoadPriority load_priority_;
|
||||
// some meta cache for searching
|
||||
// json_path -> [json_path_int, json_path_array, json_path_object, ...], only for all keys
|
||||
// json_path -> [json_path_int, json_path_string, ...], only for shredding columns
|
||||
std::unordered_map<std::string, std::set<std::string>> key_field_map_;
|
||||
// field_name -> data_type, such as json_path_int -> JSONType::INT64, only for real shredding columns
|
||||
std::unordered_map<std::string, JSONType> shred_field_data_type_map_;
|
||||
// key_name -> data_type, such as json_path_int -> JSONType::INT64, for all keys
|
||||
std::unordered_map<std::string, JSONType> key_data_type_map_;
|
||||
// field_name -> key_type, such as json_path_int -> JsonKeyLayoutType::TYPED, for all keys
|
||||
std::unordered_map<std::string, JsonKeyLayoutType> field_layout_type_map_;
|
||||
// field_name -> field_id, such as json_path_int -> 1001
|
||||
std::unordered_map<std::string, int64_t> field_name_to_id_map_;
|
||||
// field_id -> field_name, such as 1001 -> json_path_int
|
||||
|
||||
@ -267,10 +267,7 @@ TEST_P(JsonKeyStatsTest, TestExecutorForGettingValid) {
|
||||
index_->ExecutorForGettingValid(nullptr, field, valid_res_view);
|
||||
EXPECT_EQ(processed_size, size_);
|
||||
}
|
||||
if (!index_->CanSkipShared(path)) {
|
||||
std::cout << "can not skip shared" << std::endl;
|
||||
index_->ExecuteExistsPathForSharedData(path, valid_res_view);
|
||||
}
|
||||
index_->ExecuteExistsPathForSharedData(path, valid_res_view);
|
||||
std::cout << "valid_res.count(): " << valid_res.count() << std::endl;
|
||||
if (nullable_) {
|
||||
EXPECT_EQ(valid_res.count(), 400);
|
||||
@ -323,15 +320,6 @@ TEST_P(JsonKeyStatsTest, TestGetShreddingFields) {
|
||||
EXPECT_FALSE(typed_fields.empty());
|
||||
}
|
||||
|
||||
TEST_P(JsonKeyStatsTest, TestCanSkipShared) {
|
||||
std::string path = "/int";
|
||||
std::set<JSONType> target_types = {JSONType::INT64};
|
||||
EXPECT_TRUE(index_->CanSkipShared(path, target_types));
|
||||
|
||||
target_types = {JSONType::STRING};
|
||||
EXPECT_TRUE(index_->CanSkipShared(path, target_types));
|
||||
}
|
||||
|
||||
class JsonKeyStatsUploadLoadTest : public ::testing::Test {
|
||||
protected:
|
||||
void
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user