mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
related: #36380 <!-- This is an auto-generated comment: release notes by coderabbit.ai --> - Core invariant: aggregation is centralized and schema-aware — all aggregate functions are created via the exec Aggregate registry (milvus::exec::Aggregate) and validated by ValidateAggFieldType, use a single in-memory accumulator layout (Accumulator/RowContainer) and grouping primitives (GroupingSet, HashTable, VectorHasher), ensuring consistent typing, null semantics and offsets across planner → exec → reducer conversion paths (toAggregateInfo, Aggregate::create, GroupingSet, AggResult converters). - Removed / simplified logic: removed ad‑hoc count/group-by and reducer code (CountNode/PhyCountNode, GroupByNode/PhyGroupByNode, cntReducer and its tests) and consolidated into a unified AggregationNode → PhyAggregationNode + GroupingSet + HashTable execution path and centralized reducers (MilvusAggReducer, InternalAggReducer, SegcoreAggReducer). AVG now implemented compositionally (SUM + COUNT) rather than a bespoke operator, eliminating duplicate implementations. - Why this does NOT cause data loss or regressions: existing data-access and serialization paths are preserved and explicitly validated — bulk_subscript / bulk_script_field_data and FieldData creation are used for output materialization; converters (InternalResult2AggResult ↔ AggResult2internalResult, SegcoreResults2AggResult ↔ AggResult2segcoreResult) enforce shape/type/row-count validation; proxy and plan-level checks (MatchAggregationExpression, translateOutputFields, ValidateAggFieldType, translateGroupByFieldIds) reject unsupported inputs (ARRAY/JSON, unsupported datatypes) early. Empty-result generation and explicit error returns guard against silent corruption. - New capability and scope: end-to-end GROUP BY and aggregation support added across the stack — proto (plan.proto, RetrieveRequest fields group_by_field_ids/aggregates), planner nodes (AggregationNode, ProjectNode, SearchGroupByNode), exec operators (PhyAggregationNode, PhyProjectNode) and aggregation core (Aggregate implementations: Sum/Count/Min/Max, SimpleNumericAggregate, RowContainer, GroupingSet, HashTable) plus proxy/querynode reducers and tests — enabling grouped and global aggregation (sum, count, min, max, avg via sum+count) with schema-aware validation and reduction. <!-- end of auto-generated comment: release notes by coderabbit.ai --> Signed-off-by: MrPresent-Han <chun.han@gmail.com> Co-authored-by: MrPresent-Han <chun.han@gmail.com>
236 lines
5.1 KiB
Go
236 lines
5.1 KiB
Go
package agg
|
|
|
|
import (
|
|
"fmt"
|
|
|
|
"github.com/milvus-io/milvus/pkg/v2/proto/planpb"
|
|
)
|
|
|
|
type SumAggregate struct {
|
|
fieldID int64
|
|
originalName string
|
|
isAvg bool
|
|
}
|
|
|
|
func (sum *SumAggregate) Name() string {
|
|
return kSum
|
|
}
|
|
|
|
func (sum *SumAggregate) Update(target *FieldValue, new *FieldValue) error {
|
|
return AccumulateFieldValue(target, new)
|
|
}
|
|
|
|
func (sum *SumAggregate) ToPB() *planpb.Aggregate {
|
|
return &planpb.Aggregate{Op: planpb.AggregateOp_sum, FieldId: sum.FieldID()}
|
|
}
|
|
|
|
func (sum *SumAggregate) FieldID() int64 {
|
|
return sum.fieldID
|
|
}
|
|
|
|
func (sum *SumAggregate) OriginalName() string {
|
|
return sum.originalName
|
|
}
|
|
|
|
type CountAggregate struct {
|
|
fieldID int64
|
|
originalName string
|
|
isAvg bool
|
|
}
|
|
|
|
func (count *CountAggregate) Name() string {
|
|
return kCount
|
|
}
|
|
|
|
func (count *CountAggregate) Update(target *FieldValue, new *FieldValue) error {
|
|
return AccumulateFieldValue(target, new)
|
|
}
|
|
|
|
func (count *CountAggregate) ToPB() *planpb.Aggregate {
|
|
return &planpb.Aggregate{Op: planpb.AggregateOp_count, FieldId: count.FieldID()}
|
|
}
|
|
|
|
func (count *CountAggregate) FieldID() int64 {
|
|
return count.fieldID
|
|
}
|
|
|
|
func (count *CountAggregate) OriginalName() string {
|
|
return count.originalName
|
|
}
|
|
|
|
type MinAggregate struct {
|
|
fieldID int64
|
|
originalName string
|
|
}
|
|
|
|
func (min *MinAggregate) Name() string {
|
|
return kMin
|
|
}
|
|
|
|
func (min *MinAggregate) Update(target *FieldValue, new *FieldValue) error {
|
|
if target == nil || new == nil {
|
|
return fmt.Errorf("target or new field value is nil")
|
|
}
|
|
|
|
// Handle nil `val` for initialization
|
|
if target.val == nil {
|
|
target.val = new.val
|
|
return nil
|
|
}
|
|
|
|
// Compare and keep the minimum value
|
|
switch targetVal := target.val.(type) {
|
|
case int:
|
|
newVal, ok := new.val.(int)
|
|
if !ok {
|
|
return fmt.Errorf("type mismatch: target is int, new is %T", new.val)
|
|
}
|
|
if newVal < targetVal {
|
|
target.val = newVal
|
|
}
|
|
case int32:
|
|
newVal, ok := new.val.(int32)
|
|
if !ok {
|
|
return fmt.Errorf("type mismatch: target is int32, new is %T", new.val)
|
|
}
|
|
if newVal < targetVal {
|
|
target.val = newVal
|
|
}
|
|
case int64:
|
|
newVal, ok := new.val.(int64)
|
|
if !ok {
|
|
return fmt.Errorf("type mismatch: target is int64, new is %T", new.val)
|
|
}
|
|
if newVal < targetVal {
|
|
target.val = newVal
|
|
}
|
|
case float32:
|
|
newVal, ok := new.val.(float32)
|
|
if !ok {
|
|
return fmt.Errorf("type mismatch: target is float32, new is %T", new.val)
|
|
}
|
|
if newVal < targetVal {
|
|
target.val = newVal
|
|
}
|
|
case float64:
|
|
newVal, ok := new.val.(float64)
|
|
if !ok {
|
|
return fmt.Errorf("type mismatch: target is float64, new is %T", new.val)
|
|
}
|
|
if newVal < targetVal {
|
|
target.val = newVal
|
|
}
|
|
case string:
|
|
newVal, ok := new.val.(string)
|
|
if !ok {
|
|
return fmt.Errorf("type mismatch: target is string, new is %T", new.val)
|
|
}
|
|
if newVal < targetVal {
|
|
target.val = newVal
|
|
}
|
|
default:
|
|
return fmt.Errorf("unsupported type for min aggregation: %T", target.val)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (min *MinAggregate) ToPB() *planpb.Aggregate {
|
|
return &planpb.Aggregate{Op: planpb.AggregateOp_min, FieldId: min.FieldID()}
|
|
}
|
|
|
|
func (min *MinAggregate) FieldID() int64 {
|
|
return min.fieldID
|
|
}
|
|
|
|
func (min *MinAggregate) OriginalName() string {
|
|
return min.originalName
|
|
}
|
|
|
|
type MaxAggregate struct {
|
|
fieldID int64
|
|
originalName string
|
|
}
|
|
|
|
func (max *MaxAggregate) Name() string {
|
|
return kMax
|
|
}
|
|
|
|
func (max *MaxAggregate) Update(target *FieldValue, new *FieldValue) error {
|
|
if target == nil || new == nil {
|
|
return fmt.Errorf("target or new field value is nil")
|
|
}
|
|
|
|
// Handle nil `val` for initialization
|
|
if target.val == nil {
|
|
target.val = new.val
|
|
return nil
|
|
}
|
|
|
|
// Compare and keep the maximum value
|
|
switch targetVal := target.val.(type) {
|
|
case int:
|
|
newVal, ok := new.val.(int)
|
|
if !ok {
|
|
return fmt.Errorf("type mismatch: target is int, new is %T", new.val)
|
|
}
|
|
if newVal > targetVal {
|
|
target.val = newVal
|
|
}
|
|
case int32:
|
|
newVal, ok := new.val.(int32)
|
|
if !ok {
|
|
return fmt.Errorf("type mismatch: target is int32, new is %T", new.val)
|
|
}
|
|
if newVal > targetVal {
|
|
target.val = newVal
|
|
}
|
|
case int64:
|
|
newVal, ok := new.val.(int64)
|
|
if !ok {
|
|
return fmt.Errorf("type mismatch: target is int64, new is %T", new.val)
|
|
}
|
|
if newVal > targetVal {
|
|
target.val = newVal
|
|
}
|
|
case float32:
|
|
newVal, ok := new.val.(float32)
|
|
if !ok {
|
|
return fmt.Errorf("type mismatch: target is float32, new is %T", new.val)
|
|
}
|
|
if newVal > targetVal {
|
|
target.val = newVal
|
|
}
|
|
case float64:
|
|
newVal, ok := new.val.(float64)
|
|
if !ok {
|
|
return fmt.Errorf("type mismatch: target is float64, new is %T", new.val)
|
|
}
|
|
if newVal > targetVal {
|
|
target.val = newVal
|
|
}
|
|
case string:
|
|
newVal, ok := new.val.(string)
|
|
if !ok {
|
|
return fmt.Errorf("type mismatch: target is string, new is %T", new.val)
|
|
}
|
|
if newVal > targetVal {
|
|
target.val = newVal
|
|
}
|
|
default:
|
|
return fmt.Errorf("unsupported type for max aggregation: %T", target.val)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (max *MaxAggregate) ToPB() *planpb.Aggregate {
|
|
return &planpb.Aggregate{Op: planpb.AggregateOp_max, FieldId: max.FieldID()}
|
|
}
|
|
|
|
func (max *MaxAggregate) FieldID() int64 {
|
|
return max.fieldID
|
|
}
|
|
|
|
func (max *MaxAggregate) OriginalName() string {
|
|
return max.originalName
|
|
}
|