diff --git a/internal/core/src/exec/expression/BinaryRangeExpr.cpp b/internal/core/src/exec/expression/BinaryRangeExpr.cpp index 57f5cdbc5d..07078ce579 100644 --- a/internal/core/src/exec/expression/BinaryRangeExpr.cpp +++ b/internal/core/src/exec/expression/BinaryRangeExpr.cpp @@ -305,19 +305,17 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForData(EvalCtx& context) { TargetBitmapView valid_res(res_vec->GetValidRawData(), real_batch_size); size_t processed_cursor = 0; - auto execute_sub_batch = [lower_inclusive, - upper_inclusive, - &processed_cursor, - &bitmap_input]( - const T* data, - const bool* valid_data, - const int32_t* offsets, - const int size, - TargetBitmapView res, - TargetBitmapView valid_res, - HighPrecisionType val1, - HighPrecisionType val2) { + auto execute_sub_batch = + [ lower_inclusive, upper_inclusive, &processed_cursor, & + bitmap_input ]( + const T* data, + const bool* valid_data, + const int32_t* offsets, + const int size, + TargetBitmapView res, + TargetBitmapView valid_res, + HighPrecisionType val1, + HighPrecisionType val2) { if (lower_inclusive && upper_inclusive) { BinaryRangeElementFunc func; func(val1, @@ -449,20 +447,22 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForJson(EvalCtx& context) { auto pointer = milvus::Json::pointer(expr_->column_.nested_path_); size_t processed_cursor = 0; - auto execute_sub_batch = [lower_inclusive, - upper_inclusive, - pointer, - &bitmap_input, - &processed_cursor]( - const milvus::Json* data, - const bool* valid_data, - const int32_t* offsets, - const int size, - TargetBitmapView res, - TargetBitmapView valid_res, - ValueType val1, - ValueType val2) { + auto execute_sub_batch = + [ + lower_inclusive, + upper_inclusive, + pointer, + &bitmap_input, + &processed_cursor + ]( + const milvus::Json* data, + const bool* valid_data, + const int32_t* offsets, + const int size, + TargetBitmapView res, + TargetBitmapView valid_res, + ValueType val1, + ValueType val2) { if (lower_inclusive && upper_inclusive) { BinaryRangeElementFuncForJson func; @@ -778,8 +778,8 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForJsonForIndex() { } TargetBitmap result; result.append( - cached_index_chunk_res_, current_data_chunk_pos_, real_batch_size); - current_data_chunk_pos_ += real_batch_size; + cached_index_chunk_res_, current_data_global_pos_, real_batch_size); + MoveCursor(); return std::make_shared(std::move(result), TargetBitmap(real_batch_size, true)); } @@ -820,20 +820,18 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForArray(EvalCtx& context) { } size_t processed_cursor = 0; - auto execute_sub_batch = [lower_inclusive, - upper_inclusive, - &processed_cursor, - &bitmap_input]( - const milvus::ArrayView* data, - const bool* valid_data, - const int32_t* offsets, - const int size, - TargetBitmapView res, - TargetBitmapView valid_res, - ValueType val1, - ValueType val2, - int index) { + auto execute_sub_batch = + [ lower_inclusive, upper_inclusive, &processed_cursor, & + bitmap_input ]( + const milvus::ArrayView* data, + const bool* valid_data, + const int32_t* offsets, + const int size, + TargetBitmapView res, + TargetBitmapView valid_res, + ValueType val1, + ValueType val2, + int index) { if (lower_inclusive && upper_inclusive) { BinaryRangeElementFuncForArray func; diff --git a/internal/core/src/exec/expression/ExistsExpr.cpp b/internal/core/src/exec/expression/ExistsExpr.cpp index 12577d3f1a..17bf3ba726 100644 --- a/internal/core/src/exec/expression/ExistsExpr.cpp +++ b/internal/core/src/exec/expression/ExistsExpr.cpp @@ -164,11 +164,14 @@ PhyExistsFilterExpr::EvalJsonExistsForDataSegment(EvalCtx& context) { VectorPtr PhyExistsFilterExpr::EvalJsonExistsForDataSegmentForIndex() { - auto real_batch_size = current_data_chunk_pos_ + batch_size_ > active_count_ - ? active_count_ - current_data_chunk_pos_ - : batch_size_; + auto real_batch_size = GetNextBatchSize(); + if (real_batch_size == 0) { + return nullptr; + } + auto pointer = milvus::Json::pointer(expr_->column_.nested_path_); if (cached_index_chunk_id_ != 0) { + cached_index_chunk_id_ = 0; const segcore::SegmentInternalInterface* segment = nullptr; if (segment_->type() == SegmentType::Growing) { segment = @@ -196,12 +199,11 @@ PhyExistsFilterExpr::EvalJsonExistsForDataSegmentForIndex() { is_strong_consistency, filter_func) .clone(); - cached_index_chunk_id_ = 0; } TargetBitmap result; result.append( - cached_index_chunk_res_, current_data_chunk_pos_, real_batch_size); - current_data_chunk_pos_ += real_batch_size; + cached_index_chunk_res_, current_data_global_pos_, real_batch_size); + MoveCursor(); return std::make_shared(std::move(result), TargetBitmap(real_batch_size, true)); } diff --git a/internal/core/src/exec/expression/Expr.h b/internal/core/src/exec/expression/Expr.h index bd075dc389..15b84166fa 100644 --- a/internal/core/src/exec/expression/Expr.h +++ b/internal/core/src/exec/expression/Expr.h @@ -218,6 +218,8 @@ class SegmentExpr : public Expr { if (processed_size >= batch_size_) { current_data_chunk_ = i; current_data_chunk_pos_ = data_pos + size; + current_data_global_pos_ = + current_data_global_pos_ + processed_size; break; } // } @@ -229,6 +231,7 @@ class SegmentExpr : public Expr { auto size = std::min(active_count_ - current_data_chunk_pos_, batch_size_); current_data_chunk_pos_ += size; + current_data_global_pos_ += size; } else { int64_t processed_size = 0; for (size_t i = current_data_chunk_; i < num_data_chunk_; i++) { @@ -245,6 +248,8 @@ class SegmentExpr : public Expr { if (processed_size >= batch_size_) { current_data_chunk_ = i; current_data_chunk_pos_ = data_pos + size; + current_data_global_pos_ = + current_data_global_pos_ + processed_size; break; } } @@ -1266,6 +1271,7 @@ class SegmentExpr : public Expr { // because expr maybe called for every batch. int64_t current_data_chunk_{0}; int64_t current_data_chunk_pos_{0}; + int64_t current_data_global_pos_{0}; int64_t current_index_chunk_{0}; int64_t current_index_chunk_pos_{0}; int64_t size_per_chunk_{0}; diff --git a/internal/core/src/exec/expression/JsonContainsExpr.cpp b/internal/core/src/exec/expression/JsonContainsExpr.cpp index e30a050199..8933a5b9a8 100644 --- a/internal/core/src/exec/expression/JsonContainsExpr.cpp +++ b/internal/core/src/exec/expression/JsonContainsExpr.cpp @@ -433,8 +433,8 @@ PhyJsonContainsFilterExpr::ExecJsonContainsByKeyIndex() { } TargetBitmap result; result.append( - cached_index_chunk_res_, current_data_chunk_pos_, real_batch_size); - current_data_chunk_pos_ += real_batch_size; + cached_index_chunk_res_, current_data_global_pos_, real_batch_size); + MoveCursor(); return std::make_shared(std::move(result), TargetBitmap(real_batch_size, true)); } @@ -619,8 +619,8 @@ PhyJsonContainsFilterExpr::ExecJsonContainsArrayByKeyIndex() { } TargetBitmap result; result.append( - cached_index_chunk_res_, current_data_chunk_pos_, real_batch_size); - current_data_chunk_pos_ += real_batch_size; + cached_index_chunk_res_, current_data_global_pos_, real_batch_size); + MoveCursor(); return std::make_shared(std::move(result), TargetBitmap(real_batch_size, true)); } @@ -898,8 +898,8 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllByKeyIndex() { } TargetBitmap result; result.append( - cached_index_chunk_res_, current_data_chunk_pos_, real_batch_size); - current_data_chunk_pos_ += real_batch_size; + cached_index_chunk_res_, current_data_global_pos_, real_batch_size); + MoveCursor(); return std::make_shared(std::move(result), TargetBitmap(real_batch_size, true)); } @@ -1207,8 +1207,8 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllWithDiffTypeByKeyIndex() { } TargetBitmap result; result.append( - cached_index_chunk_res_, current_data_chunk_pos_, real_batch_size); - current_data_chunk_pos_ += real_batch_size; + cached_index_chunk_res_, current_data_global_pos_, real_batch_size); + MoveCursor(); return std::make_shared(std::move(result), TargetBitmap(real_batch_size, true)); } @@ -1403,8 +1403,8 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllArrayByKeyIndex() { } TargetBitmap result; result.append( - cached_index_chunk_res_, current_data_chunk_pos_, real_batch_size); - current_data_chunk_pos_ += real_batch_size; + cached_index_chunk_res_, current_data_global_pos_, real_batch_size); + MoveCursor(); return std::make_shared(std::move(result), TargetBitmap(real_batch_size, true)); } @@ -1687,8 +1687,8 @@ PhyJsonContainsFilterExpr::ExecJsonContainsWithDiffTypeByKeyIndex() { } TargetBitmap result; result.append( - cached_index_chunk_res_, current_data_chunk_pos_, real_batch_size); - current_data_chunk_pos_ += real_batch_size; + cached_index_chunk_res_, current_data_global_pos_, real_batch_size); + MoveCursor(); return std::make_shared(std::move(result), TargetBitmap(real_batch_size, true)); } diff --git a/internal/core/src/exec/expression/TermExpr.cpp b/internal/core/src/exec/expression/TermExpr.cpp index c25a30b715..93b8fc1050 100644 --- a/internal/core/src/exec/expression/TermExpr.cpp +++ b/internal/core/src/exec/expression/TermExpr.cpp @@ -680,8 +680,8 @@ PhyTermFilterExpr::ExecJsonInVariableByKeyIndex() { TargetBitmap result; result.append( - cached_index_chunk_res_, current_data_chunk_pos_, real_batch_size); - current_data_chunk_pos_ += real_batch_size; + cached_index_chunk_res_, current_data_global_pos_, real_batch_size); + MoveCursor(); return std::make_shared(std::move(result), TargetBitmap(real_batch_size, true)); } diff --git a/internal/core/src/exec/expression/UnaryExpr.cpp b/internal/core/src/exec/expression/UnaryExpr.cpp index 3407bee88d..d46d4cd941 100644 --- a/internal/core/src/exec/expression/UnaryExpr.cpp +++ b/internal/core/src/exec/expression/UnaryExpr.cpp @@ -319,9 +319,8 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplArray(EvalCtx& context) { } int processed_cursor = 0; auto execute_sub_batch = - [op_type, - &processed_cursor, - &bitmap_input]( + [ op_type, &processed_cursor, & + bitmap_input ]( const milvus::ArrayView* data, const bool* valid_data, const int32_t* offsets, @@ -330,152 +329,151 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplArray(EvalCtx& context) { TargetBitmapView valid_res, ValueType val, int index) { - switch (op_type) { - case proto::plan::GreaterThan: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - bitmap_input, - processed_cursor, - offsets); - break; - } - case proto::plan::GreaterEqual: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - bitmap_input, - processed_cursor, - offsets); - break; - } - case proto::plan::LessThan: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - bitmap_input, - processed_cursor, - offsets); - break; - } - case proto::plan::LessEqual: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - bitmap_input, - processed_cursor, - offsets); - break; - } - case proto::plan::Equal: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - bitmap_input, - processed_cursor, - offsets); - break; - } - case proto::plan::NotEqual: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - bitmap_input, - processed_cursor, - offsets); - break; - } - case proto::plan::PrefixMatch: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - bitmap_input, - processed_cursor, - offsets); - break; - } - case proto::plan::Match: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - bitmap_input, - processed_cursor, - offsets); - break; - } - default: - PanicInfo( - OpTypeInvalid, - fmt::format( - "unsupported operator type for unary expr: {}", - op_type)); + switch (op_type) { + case proto::plan::GreaterThan: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + bitmap_input, + processed_cursor, + offsets); + break; } - processed_cursor += size; - }; + case proto::plan::GreaterEqual: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + bitmap_input, + processed_cursor, + offsets); + break; + } + case proto::plan::LessThan: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + bitmap_input, + processed_cursor, + offsets); + break; + } + case proto::plan::LessEqual: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + bitmap_input, + processed_cursor, + offsets); + break; + } + case proto::plan::Equal: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + bitmap_input, + processed_cursor, + offsets); + break; + } + case proto::plan::NotEqual: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + bitmap_input, + processed_cursor, + offsets); + break; + } + case proto::plan::PrefixMatch: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + bitmap_input, + processed_cursor, + offsets); + break; + } + case proto::plan::Match: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + bitmap_input, + processed_cursor, + offsets); + break; + } + default: + PanicInfo( + OpTypeInvalid, + fmt::format("unsupported operator type for unary expr: {}", + op_type)); + } + processed_cursor += size; + }; int64_t processed_size; if (has_offset_input_) { processed_size = @@ -547,7 +545,7 @@ PhyUnaryRangeFilterExpr::ExecArrayEqualForIndex(EvalCtx& context, }; } else { auto size_per_chunk = segment_->size_per_chunk(); - retrieve = [ size_per_chunk, this ](int64_t offset) -> auto{ + retrieve = [ size_per_chunk, this ](int64_t offset) -> auto { auto chunk_idx = offset / size_per_chunk; auto chunk_offset = offset % size_per_chunk; const auto& chunk = @@ -680,18 +678,16 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJson(EvalCtx& context) { } while (false) int processed_cursor = 0; - auto execute_sub_batch = [op_type, - pointer, - &processed_cursor, - &bitmap_input]( - const milvus::Json* data, - const bool* valid_data, - const int32_t* offsets, - const int size, - TargetBitmapView res, - TargetBitmapView valid_res, - ExprValueType val) { + auto execute_sub_batch = + [ op_type, pointer, &processed_cursor, & + bitmap_input ]( + const milvus::Json* data, + const bool* valid_data, + const int32_t* offsets, + const int size, + TargetBitmapView res, + TargetBitmapView valid_res, + ExprValueType val) { bool has_bitmap_input = !bitmap_input.empty(); switch (op_type) { case proto::plan::GreaterThan: { @@ -932,9 +928,10 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJsonForIndex() { std::conditional_t, std::string_view, ExprValueType>; - auto real_batch_size = current_data_chunk_pos_ + batch_size_ > active_count_ - ? active_count_ - current_data_chunk_pos_ - : batch_size_; + auto real_batch_size = GetNextBatchSize(); + if (real_batch_size == 0) { + return nullptr; + } auto pointerpath = milvus::Json::pointer(expr_->column_.nested_path_); auto pointerpair = SplitAtFirstSlashDigit(pointerpath); std::string pointer = pointerpair.first; @@ -1175,6 +1172,7 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJsonForIndex() { auto op_type = expr_->op_type_; if (cached_index_chunk_id_ != 0) { + cached_index_chunk_id_ = 0; const segcore::SegmentInternalInterface* segment = nullptr; if (segment_->type() == SegmentType::Growing) { segment = @@ -1402,12 +1400,11 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJsonForIndex() { is_strong_consistency, filter_func) .clone(); - cached_index_chunk_id_ = 0; } TargetBitmap result; result.append( - cached_index_chunk_res_, current_data_chunk_pos_, real_batch_size); - current_data_chunk_pos_ += real_batch_size; + cached_index_chunk_res_, current_data_global_pos_, real_batch_size); + MoveCursor(); return std::make_shared(std::move(result), TargetBitmap(real_batch_size, true)); } @@ -1608,17 +1605,16 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplForData(EvalCtx& context) { auto expr_type = expr_->op_type_; size_t processed_cursor = 0; - auto execute_sub_batch = [expr_type, - &processed_cursor, - &bitmap_input]( - const T* data, - const bool* valid_data, - const int32_t* offsets, - const int size, - TargetBitmapView res, - TargetBitmapView valid_res, - IndexInnerType val) { + auto execute_sub_batch = + [ expr_type, &processed_cursor, & + bitmap_input ]( + const T* data, + const bool* valid_data, + const int32_t* offsets, + const int size, + TargetBitmapView res, + TargetBitmapView valid_res, + IndexInnerType val) { switch (expr_type) { case proto::plan::GreaterThan: { UnaryElementFunc func; diff --git a/internal/core/src/index/JsonKeyStatsInvertedIndex.cpp b/internal/core/src/index/JsonKeyStatsInvertedIndex.cpp index d069f4bae1..0f6dcd4883 100644 --- a/internal/core/src/index/JsonKeyStatsInvertedIndex.cpp +++ b/internal/core/src/index/JsonKeyStatsInvertedIndex.cpp @@ -30,10 +30,7 @@ JsonKeyStatsInvertedIndex::AddJSONEncodeValue( uint16_t length, int32_t value, std::map>& mp) { - std::string key = ""; - if (!paths.empty()) { - key = std::string("/") + Join(paths, "/"); - } + std::string key = milvus::Json::pointer(paths); LOG_DEBUG( "insert inverted key: {}, flag: {}, type: {}, row_id: {}, offset: " "{}, length:{}, value:{}", @@ -382,7 +379,8 @@ JsonKeyStatsInvertedIndex::Load(milvus::tracer::TraceContext ctx, disk_file_manager_->CacheJsonKeyIndexToDisk(index_files.value()); AssertInfo( tantivy_index_exist(path_.c_str()), "index not exist: {}", path_); - wrapper_ = std::make_shared(path_.c_str(), milvus::index::SetBitset); + wrapper_ = std::make_shared(path_.c_str(), + milvus::index::SetBitset); LOG_INFO("load json key index done for field id:{} with dir:{}", field_id_, path_); diff --git a/internal/datacoord/job_manager.go b/internal/datacoord/job_manager.go index 7f81fbbc08..77e77a9d26 100644 --- a/internal/datacoord/job_manager.go +++ b/internal/datacoord/job_manager.go @@ -296,7 +296,12 @@ func (jm *statsJobManager) SubmitStatsTask(originSegmentID, targetSegmentID int6 if err != nil { return err } - taskSlot := calculateStatsTaskSlot(originSegment.getSegmentSize()) + originSegmentSize := originSegment.getSegmentSize() + if subJobType == indexpb.StatsSubJob_JsonKeyIndexJob { + originSegmentSize = originSegment.getSegmentSize() * 2 + } + + taskSlot := calculateStatsTaskSlot(originSegmentSize) t := &indexpb.StatsTask{ CollectionID: originSegment.GetCollectionID(), PartitionID: originSegment.GetPartitionID(),