fix: JsonStats filter by conjunctExpr and improve the task slot calculation logic (#41459)

Optimized JSON filter execution by introducing
ProcessJsonStatsChunkPos() for unified position calculation and
GetNextBatchSize() for better batch processing.
Improved JSON key generation by replacing manual path joining with
milvus::Json::pointer() and adjusted slot size calculation for JSON key
index jobs.
Updated the task slot calculation logic in calculateStatsTaskSlot() to
handle the increased resource needs of JSON key index jobs.
issue: https://github.com/milvus-io/milvus/issues/41378
https://github.com/milvus-io/milvus/issues/41218

---------

Signed-off-by: Xianhui.Lin <xianhui.lin@zilliz.com>
This commit is contained in:
Xianhui Lin 2025-04-23 16:30:37 +08:00 committed by GitHub
parent 655cc7fe06
commit 3d4889586d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 252 additions and 247 deletions

View File

@ -305,19 +305,17 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForData(EvalCtx& context) {
TargetBitmapView valid_res(res_vec->GetValidRawData(), real_batch_size);
size_t processed_cursor = 0;
auto execute_sub_batch = [lower_inclusive,
upper_inclusive,
&processed_cursor,
&bitmap_input]<FilterType filter_type =
FilterType::sequential>(
const T* data,
const bool* valid_data,
const int32_t* offsets,
const int size,
TargetBitmapView res,
TargetBitmapView valid_res,
HighPrecisionType val1,
HighPrecisionType val2) {
auto execute_sub_batch =
[ lower_inclusive, upper_inclusive, &processed_cursor, &
bitmap_input ]<FilterType filter_type = FilterType::sequential>(
const T* data,
const bool* valid_data,
const int32_t* offsets,
const int size,
TargetBitmapView res,
TargetBitmapView valid_res,
HighPrecisionType val1,
HighPrecisionType val2) {
if (lower_inclusive && upper_inclusive) {
BinaryRangeElementFunc<T, true, true, filter_type> func;
func(val1,
@ -449,20 +447,22 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForJson(EvalCtx& context) {
auto pointer = milvus::Json::pointer(expr_->column_.nested_path_);
size_t processed_cursor = 0;
auto execute_sub_batch = [lower_inclusive,
upper_inclusive,
pointer,
&bitmap_input,
&processed_cursor]<FilterType filter_type =
FilterType::sequential>(
const milvus::Json* data,
const bool* valid_data,
const int32_t* offsets,
const int size,
TargetBitmapView res,
TargetBitmapView valid_res,
ValueType val1,
ValueType val2) {
auto execute_sub_batch =
[
lower_inclusive,
upper_inclusive,
pointer,
&bitmap_input,
&processed_cursor
]<FilterType filter_type = FilterType::sequential>(
const milvus::Json* data,
const bool* valid_data,
const int32_t* offsets,
const int size,
TargetBitmapView res,
TargetBitmapView valid_res,
ValueType val1,
ValueType val2) {
if (lower_inclusive && upper_inclusive) {
BinaryRangeElementFuncForJson<ValueType, true, true, filter_type>
func;
@ -778,8 +778,8 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForJsonForIndex() {
}
TargetBitmap result;
result.append(
cached_index_chunk_res_, current_data_chunk_pos_, real_batch_size);
current_data_chunk_pos_ += real_batch_size;
cached_index_chunk_res_, current_data_global_pos_, real_batch_size);
MoveCursor();
return std::make_shared<ColumnVector>(std::move(result),
TargetBitmap(real_batch_size, true));
}
@ -820,20 +820,18 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForArray(EvalCtx& context) {
}
size_t processed_cursor = 0;
auto execute_sub_batch = [lower_inclusive,
upper_inclusive,
&processed_cursor,
&bitmap_input]<FilterType filter_type =
FilterType::sequential>(
const milvus::ArrayView* data,
const bool* valid_data,
const int32_t* offsets,
const int size,
TargetBitmapView res,
TargetBitmapView valid_res,
ValueType val1,
ValueType val2,
int index) {
auto execute_sub_batch =
[ lower_inclusive, upper_inclusive, &processed_cursor, &
bitmap_input ]<FilterType filter_type = FilterType::sequential>(
const milvus::ArrayView* data,
const bool* valid_data,
const int32_t* offsets,
const int size,
TargetBitmapView res,
TargetBitmapView valid_res,
ValueType val1,
ValueType val2,
int index) {
if (lower_inclusive && upper_inclusive) {
BinaryRangeElementFuncForArray<ValueType, true, true, filter_type>
func;

View File

@ -164,11 +164,14 @@ PhyExistsFilterExpr::EvalJsonExistsForDataSegment(EvalCtx& context) {
VectorPtr
PhyExistsFilterExpr::EvalJsonExistsForDataSegmentForIndex() {
auto real_batch_size = current_data_chunk_pos_ + batch_size_ > active_count_
? active_count_ - current_data_chunk_pos_
: batch_size_;
auto real_batch_size = GetNextBatchSize();
if (real_batch_size == 0) {
return nullptr;
}
auto pointer = milvus::Json::pointer(expr_->column_.nested_path_);
if (cached_index_chunk_id_ != 0) {
cached_index_chunk_id_ = 0;
const segcore::SegmentInternalInterface* segment = nullptr;
if (segment_->type() == SegmentType::Growing) {
segment =
@ -196,12 +199,11 @@ PhyExistsFilterExpr::EvalJsonExistsForDataSegmentForIndex() {
is_strong_consistency,
filter_func)
.clone();
cached_index_chunk_id_ = 0;
}
TargetBitmap result;
result.append(
cached_index_chunk_res_, current_data_chunk_pos_, real_batch_size);
current_data_chunk_pos_ += real_batch_size;
cached_index_chunk_res_, current_data_global_pos_, real_batch_size);
MoveCursor();
return std::make_shared<ColumnVector>(std::move(result),
TargetBitmap(real_batch_size, true));
}

View File

@ -218,6 +218,8 @@ class SegmentExpr : public Expr {
if (processed_size >= batch_size_) {
current_data_chunk_ = i;
current_data_chunk_pos_ = data_pos + size;
current_data_global_pos_ =
current_data_global_pos_ + processed_size;
break;
}
// }
@ -229,6 +231,7 @@ class SegmentExpr : public Expr {
auto size =
std::min(active_count_ - current_data_chunk_pos_, batch_size_);
current_data_chunk_pos_ += size;
current_data_global_pos_ += size;
} else {
int64_t processed_size = 0;
for (size_t i = current_data_chunk_; i < num_data_chunk_; i++) {
@ -245,6 +248,8 @@ class SegmentExpr : public Expr {
if (processed_size >= batch_size_) {
current_data_chunk_ = i;
current_data_chunk_pos_ = data_pos + size;
current_data_global_pos_ =
current_data_global_pos_ + processed_size;
break;
}
}
@ -1266,6 +1271,7 @@ class SegmentExpr : public Expr {
// because expr maybe called for every batch.
int64_t current_data_chunk_{0};
int64_t current_data_chunk_pos_{0};
int64_t current_data_global_pos_{0};
int64_t current_index_chunk_{0};
int64_t current_index_chunk_pos_{0};
int64_t size_per_chunk_{0};

View File

@ -433,8 +433,8 @@ PhyJsonContainsFilterExpr::ExecJsonContainsByKeyIndex() {
}
TargetBitmap result;
result.append(
cached_index_chunk_res_, current_data_chunk_pos_, real_batch_size);
current_data_chunk_pos_ += real_batch_size;
cached_index_chunk_res_, current_data_global_pos_, real_batch_size);
MoveCursor();
return std::make_shared<ColumnVector>(std::move(result),
TargetBitmap(real_batch_size, true));
}
@ -619,8 +619,8 @@ PhyJsonContainsFilterExpr::ExecJsonContainsArrayByKeyIndex() {
}
TargetBitmap result;
result.append(
cached_index_chunk_res_, current_data_chunk_pos_, real_batch_size);
current_data_chunk_pos_ += real_batch_size;
cached_index_chunk_res_, current_data_global_pos_, real_batch_size);
MoveCursor();
return std::make_shared<ColumnVector>(std::move(result),
TargetBitmap(real_batch_size, true));
}
@ -898,8 +898,8 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllByKeyIndex() {
}
TargetBitmap result;
result.append(
cached_index_chunk_res_, current_data_chunk_pos_, real_batch_size);
current_data_chunk_pos_ += real_batch_size;
cached_index_chunk_res_, current_data_global_pos_, real_batch_size);
MoveCursor();
return std::make_shared<ColumnVector>(std::move(result),
TargetBitmap(real_batch_size, true));
}
@ -1207,8 +1207,8 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllWithDiffTypeByKeyIndex() {
}
TargetBitmap result;
result.append(
cached_index_chunk_res_, current_data_chunk_pos_, real_batch_size);
current_data_chunk_pos_ += real_batch_size;
cached_index_chunk_res_, current_data_global_pos_, real_batch_size);
MoveCursor();
return std::make_shared<ColumnVector>(std::move(result),
TargetBitmap(real_batch_size, true));
}
@ -1403,8 +1403,8 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllArrayByKeyIndex() {
}
TargetBitmap result;
result.append(
cached_index_chunk_res_, current_data_chunk_pos_, real_batch_size);
current_data_chunk_pos_ += real_batch_size;
cached_index_chunk_res_, current_data_global_pos_, real_batch_size);
MoveCursor();
return std::make_shared<ColumnVector>(std::move(result),
TargetBitmap(real_batch_size, true));
}
@ -1687,8 +1687,8 @@ PhyJsonContainsFilterExpr::ExecJsonContainsWithDiffTypeByKeyIndex() {
}
TargetBitmap result;
result.append(
cached_index_chunk_res_, current_data_chunk_pos_, real_batch_size);
current_data_chunk_pos_ += real_batch_size;
cached_index_chunk_res_, current_data_global_pos_, real_batch_size);
MoveCursor();
return std::make_shared<ColumnVector>(std::move(result),
TargetBitmap(real_batch_size, true));
}

View File

@ -680,8 +680,8 @@ PhyTermFilterExpr::ExecJsonInVariableByKeyIndex() {
TargetBitmap result;
result.append(
cached_index_chunk_res_, current_data_chunk_pos_, real_batch_size);
current_data_chunk_pos_ += real_batch_size;
cached_index_chunk_res_, current_data_global_pos_, real_batch_size);
MoveCursor();
return std::make_shared<ColumnVector>(std::move(result),
TargetBitmap(real_batch_size, true));
}

View File

@ -319,9 +319,8 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplArray(EvalCtx& context) {
}
int processed_cursor = 0;
auto execute_sub_batch =
[op_type,
&processed_cursor,
&bitmap_input]<FilterType filter_type = FilterType::sequential>(
[ op_type, &processed_cursor, &
bitmap_input ]<FilterType filter_type = FilterType::sequential>(
const milvus::ArrayView* data,
const bool* valid_data,
const int32_t* offsets,
@ -330,152 +329,151 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplArray(EvalCtx& context) {
TargetBitmapView valid_res,
ValueType val,
int index) {
switch (op_type) {
case proto::plan::GreaterThan: {
UnaryElementFuncForArray<ValueType,
proto::plan::GreaterThan,
filter_type>
func;
func(data,
valid_data,
size,
val,
index,
res,
valid_res,
bitmap_input,
processed_cursor,
offsets);
break;
}
case proto::plan::GreaterEqual: {
UnaryElementFuncForArray<ValueType,
proto::plan::GreaterEqual,
filter_type>
func;
func(data,
valid_data,
size,
val,
index,
res,
valid_res,
bitmap_input,
processed_cursor,
offsets);
break;
}
case proto::plan::LessThan: {
UnaryElementFuncForArray<ValueType,
proto::plan::LessThan,
filter_type>
func;
func(data,
valid_data,
size,
val,
index,
res,
valid_res,
bitmap_input,
processed_cursor,
offsets);
break;
}
case proto::plan::LessEqual: {
UnaryElementFuncForArray<ValueType,
proto::plan::LessEqual,
filter_type>
func;
func(data,
valid_data,
size,
val,
index,
res,
valid_res,
bitmap_input,
processed_cursor,
offsets);
break;
}
case proto::plan::Equal: {
UnaryElementFuncForArray<ValueType,
proto::plan::Equal,
filter_type>
func;
func(data,
valid_data,
size,
val,
index,
res,
valid_res,
bitmap_input,
processed_cursor,
offsets);
break;
}
case proto::plan::NotEqual: {
UnaryElementFuncForArray<ValueType,
proto::plan::NotEqual,
filter_type>
func;
func(data,
valid_data,
size,
val,
index,
res,
valid_res,
bitmap_input,
processed_cursor,
offsets);
break;
}
case proto::plan::PrefixMatch: {
UnaryElementFuncForArray<ValueType,
proto::plan::PrefixMatch,
filter_type>
func;
func(data,
valid_data,
size,
val,
index,
res,
valid_res,
bitmap_input,
processed_cursor,
offsets);
break;
}
case proto::plan::Match: {
UnaryElementFuncForArray<ValueType,
proto::plan::Match,
filter_type>
func;
func(data,
valid_data,
size,
val,
index,
res,
valid_res,
bitmap_input,
processed_cursor,
offsets);
break;
}
default:
PanicInfo(
OpTypeInvalid,
fmt::format(
"unsupported operator type for unary expr: {}",
op_type));
switch (op_type) {
case proto::plan::GreaterThan: {
UnaryElementFuncForArray<ValueType,
proto::plan::GreaterThan,
filter_type>
func;
func(data,
valid_data,
size,
val,
index,
res,
valid_res,
bitmap_input,
processed_cursor,
offsets);
break;
}
processed_cursor += size;
};
case proto::plan::GreaterEqual: {
UnaryElementFuncForArray<ValueType,
proto::plan::GreaterEqual,
filter_type>
func;
func(data,
valid_data,
size,
val,
index,
res,
valid_res,
bitmap_input,
processed_cursor,
offsets);
break;
}
case proto::plan::LessThan: {
UnaryElementFuncForArray<ValueType,
proto::plan::LessThan,
filter_type>
func;
func(data,
valid_data,
size,
val,
index,
res,
valid_res,
bitmap_input,
processed_cursor,
offsets);
break;
}
case proto::plan::LessEqual: {
UnaryElementFuncForArray<ValueType,
proto::plan::LessEqual,
filter_type>
func;
func(data,
valid_data,
size,
val,
index,
res,
valid_res,
bitmap_input,
processed_cursor,
offsets);
break;
}
case proto::plan::Equal: {
UnaryElementFuncForArray<ValueType,
proto::plan::Equal,
filter_type>
func;
func(data,
valid_data,
size,
val,
index,
res,
valid_res,
bitmap_input,
processed_cursor,
offsets);
break;
}
case proto::plan::NotEqual: {
UnaryElementFuncForArray<ValueType,
proto::plan::NotEqual,
filter_type>
func;
func(data,
valid_data,
size,
val,
index,
res,
valid_res,
bitmap_input,
processed_cursor,
offsets);
break;
}
case proto::plan::PrefixMatch: {
UnaryElementFuncForArray<ValueType,
proto::plan::PrefixMatch,
filter_type>
func;
func(data,
valid_data,
size,
val,
index,
res,
valid_res,
bitmap_input,
processed_cursor,
offsets);
break;
}
case proto::plan::Match: {
UnaryElementFuncForArray<ValueType,
proto::plan::Match,
filter_type>
func;
func(data,
valid_data,
size,
val,
index,
res,
valid_res,
bitmap_input,
processed_cursor,
offsets);
break;
}
default:
PanicInfo(
OpTypeInvalid,
fmt::format("unsupported operator type for unary expr: {}",
op_type));
}
processed_cursor += size;
};
int64_t processed_size;
if (has_offset_input_) {
processed_size =
@ -547,7 +545,7 @@ PhyUnaryRangeFilterExpr::ExecArrayEqualForIndex(EvalCtx& context,
};
} else {
auto size_per_chunk = segment_->size_per_chunk();
retrieve = [ size_per_chunk, this ](int64_t offset) -> auto{
retrieve = [ size_per_chunk, this ](int64_t offset) -> auto {
auto chunk_idx = offset / size_per_chunk;
auto chunk_offset = offset % size_per_chunk;
const auto& chunk =
@ -680,18 +678,16 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJson(EvalCtx& context) {
} while (false)
int processed_cursor = 0;
auto execute_sub_batch = [op_type,
pointer,
&processed_cursor,
&bitmap_input]<FilterType filter_type =
FilterType::sequential>(
const milvus::Json* data,
const bool* valid_data,
const int32_t* offsets,
const int size,
TargetBitmapView res,
TargetBitmapView valid_res,
ExprValueType val) {
auto execute_sub_batch =
[ op_type, pointer, &processed_cursor, &
bitmap_input ]<FilterType filter_type = FilterType::sequential>(
const milvus::Json* data,
const bool* valid_data,
const int32_t* offsets,
const int size,
TargetBitmapView res,
TargetBitmapView valid_res,
ExprValueType val) {
bool has_bitmap_input = !bitmap_input.empty();
switch (op_type) {
case proto::plan::GreaterThan: {
@ -932,9 +928,10 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJsonForIndex() {
std::conditional_t<std::is_same_v<ExprValueType, std::string>,
std::string_view,
ExprValueType>;
auto real_batch_size = current_data_chunk_pos_ + batch_size_ > active_count_
? active_count_ - current_data_chunk_pos_
: batch_size_;
auto real_batch_size = GetNextBatchSize();
if (real_batch_size == 0) {
return nullptr;
}
auto pointerpath = milvus::Json::pointer(expr_->column_.nested_path_);
auto pointerpair = SplitAtFirstSlashDigit(pointerpath);
std::string pointer = pointerpair.first;
@ -1175,6 +1172,7 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJsonForIndex() {
auto op_type = expr_->op_type_;
if (cached_index_chunk_id_ != 0) {
cached_index_chunk_id_ = 0;
const segcore::SegmentInternalInterface* segment = nullptr;
if (segment_->type() == SegmentType::Growing) {
segment =
@ -1402,12 +1400,11 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJsonForIndex() {
is_strong_consistency,
filter_func)
.clone();
cached_index_chunk_id_ = 0;
}
TargetBitmap result;
result.append(
cached_index_chunk_res_, current_data_chunk_pos_, real_batch_size);
current_data_chunk_pos_ += real_batch_size;
cached_index_chunk_res_, current_data_global_pos_, real_batch_size);
MoveCursor();
return std::make_shared<ColumnVector>(std::move(result),
TargetBitmap(real_batch_size, true));
}
@ -1608,17 +1605,16 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplForData(EvalCtx& context) {
auto expr_type = expr_->op_type_;
size_t processed_cursor = 0;
auto execute_sub_batch = [expr_type,
&processed_cursor,
&bitmap_input]<FilterType filter_type =
FilterType::sequential>(
const T* data,
const bool* valid_data,
const int32_t* offsets,
const int size,
TargetBitmapView res,
TargetBitmapView valid_res,
IndexInnerType val) {
auto execute_sub_batch =
[ expr_type, &processed_cursor, &
bitmap_input ]<FilterType filter_type = FilterType::sequential>(
const T* data,
const bool* valid_data,
const int32_t* offsets,
const int size,
TargetBitmapView res,
TargetBitmapView valid_res,
IndexInnerType val) {
switch (expr_type) {
case proto::plan::GreaterThan: {
UnaryElementFunc<T, proto::plan::GreaterThan, filter_type> func;

View File

@ -30,10 +30,7 @@ JsonKeyStatsInvertedIndex::AddJSONEncodeValue(
uint16_t length,
int32_t value,
std::map<std::string, std::vector<int64_t>>& mp) {
std::string key = "";
if (!paths.empty()) {
key = std::string("/") + Join(paths, "/");
}
std::string key = milvus::Json::pointer(paths);
LOG_DEBUG(
"insert inverted key: {}, flag: {}, type: {}, row_id: {}, offset: "
"{}, length:{}, value:{}",
@ -382,7 +379,8 @@ JsonKeyStatsInvertedIndex::Load(milvus::tracer::TraceContext ctx,
disk_file_manager_->CacheJsonKeyIndexToDisk(index_files.value());
AssertInfo(
tantivy_index_exist(path_.c_str()), "index not exist: {}", path_);
wrapper_ = std::make_shared<TantivyIndexWrapper>(path_.c_str(), milvus::index::SetBitset);
wrapper_ = std::make_shared<TantivyIndexWrapper>(path_.c_str(),
milvus::index::SetBitset);
LOG_INFO("load json key index done for field id:{} with dir:{}",
field_id_,
path_);

View File

@ -296,7 +296,12 @@ func (jm *statsJobManager) SubmitStatsTask(originSegmentID, targetSegmentID int6
if err != nil {
return err
}
taskSlot := calculateStatsTaskSlot(originSegment.getSegmentSize())
originSegmentSize := originSegment.getSegmentSize()
if subJobType == indexpb.StatsSubJob_JsonKeyIndexJob {
originSegmentSize = originSegment.getSegmentSize() * 2
}
taskSlot := calculateStatsTaskSlot(originSegmentSize)
t := &indexpb.StatsTask{
CollectionID: originSegment.GetCollectionID(),
PartitionID: originSegment.GetPartitionID(),