mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
related: #36380 <!-- This is an auto-generated comment: release notes by coderabbit.ai --> - Core invariant: aggregation is centralized and schema-aware — all aggregate functions are created via the exec Aggregate registry (milvus::exec::Aggregate) and validated by ValidateAggFieldType, use a single in-memory accumulator layout (Accumulator/RowContainer) and grouping primitives (GroupingSet, HashTable, VectorHasher), ensuring consistent typing, null semantics and offsets across planner → exec → reducer conversion paths (toAggregateInfo, Aggregate::create, GroupingSet, AggResult converters). - Removed / simplified logic: removed ad‑hoc count/group-by and reducer code (CountNode/PhyCountNode, GroupByNode/PhyGroupByNode, cntReducer and its tests) and consolidated into a unified AggregationNode → PhyAggregationNode + GroupingSet + HashTable execution path and centralized reducers (MilvusAggReducer, InternalAggReducer, SegcoreAggReducer). AVG now implemented compositionally (SUM + COUNT) rather than a bespoke operator, eliminating duplicate implementations. - Why this does NOT cause data loss or regressions: existing data-access and serialization paths are preserved and explicitly validated — bulk_subscript / bulk_script_field_data and FieldData creation are used for output materialization; converters (InternalResult2AggResult ↔ AggResult2internalResult, SegcoreResults2AggResult ↔ AggResult2segcoreResult) enforce shape/type/row-count validation; proxy and plan-level checks (MatchAggregationExpression, translateOutputFields, ValidateAggFieldType, translateGroupByFieldIds) reject unsupported inputs (ARRAY/JSON, unsupported datatypes) early. Empty-result generation and explicit error returns guard against silent corruption. - New capability and scope: end-to-end GROUP BY and aggregation support added across the stack — proto (plan.proto, RetrieveRequest fields group_by_field_ids/aggregates), planner nodes (AggregationNode, ProjectNode, SearchGroupByNode), exec operators (PhyAggregationNode, PhyProjectNode) and aggregation core (Aggregate implementations: Sum/Count/Min/Max, SimpleNumericAggregate, RowContainer, GroupingSet, HashTable) plus proxy/querynode reducers and tests — enabling grouped and global aggregation (sum, count, min, max, avg via sum+count) with schema-aware validation and reduction. <!-- end of auto-generated comment: release notes by coderabbit.ai --> Signed-off-by: MrPresent-Han <chun.han@gmail.com> Co-authored-by: MrPresent-Han <chun.han@gmail.com>
658 lines
17 KiB
Go
658 lines
17 KiB
Go
package segments
|
|
|
|
import (
|
|
"context"
|
|
"testing"
|
|
|
|
"github.com/stretchr/testify/suite"
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
|
"github.com/milvus-io/milvus/pkg/v2/log"
|
|
"github.com/milvus-io/milvus/pkg/v2/proto/planpb"
|
|
"github.com/milvus-io/milvus/pkg/v2/proto/segcorepb"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
|
|
)
|
|
|
|
type AggReduceSuite struct {
|
|
suite.Suite
|
|
}
|
|
|
|
func (s *AggReduceSuite) SetupSuite() {
|
|
paramtable.Init()
|
|
}
|
|
|
|
func (s *AggReduceSuite) TestSegCoreAggReduceSingleColumn() {
|
|
groupByFieldIds := make([]int64, 1)
|
|
groupByFieldIds[0] = 101
|
|
aggregates := make([]*planpb.Aggregate, 1)
|
|
aggregates[0] = &planpb.Aggregate{
|
|
Op: planpb.AggregateOp_sum,
|
|
FieldId: 102,
|
|
}
|
|
|
|
// Create a minimal schema for validation
|
|
schema := &schemapb.CollectionSchema{
|
|
Name: "test_collection",
|
|
Fields: []*schemapb.FieldSchema{
|
|
{
|
|
FieldID: 101,
|
|
Name: "field101",
|
|
DataType: schemapb.DataType_Int16,
|
|
IsPrimaryKey: false,
|
|
},
|
|
{
|
|
FieldID: 102,
|
|
Name: "field102",
|
|
DataType: schemapb.DataType_Int64,
|
|
IsPrimaryKey: false,
|
|
},
|
|
},
|
|
}
|
|
|
|
aggReducer := NewSegcoreAggReducer(groupByFieldIds, aggregates, 10, schema)
|
|
results := make([]*segcorepb.RetrieveResults, 2)
|
|
{
|
|
fieldData1 := &schemapb.FieldData{
|
|
Type: schemapb.DataType_Int16,
|
|
Field: &schemapb.FieldData_Scalars{
|
|
Scalars: &schemapb.ScalarField{
|
|
Data: &schemapb.ScalarField_IntData{
|
|
IntData: &schemapb.IntArray{
|
|
Data: []int32{2, 3, 4, 8, 11},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
fieldData2 := &schemapb.FieldData{
|
|
Type: schemapb.DataType_Int64,
|
|
Field: &schemapb.FieldData_Scalars{
|
|
Scalars: &schemapb.ScalarField{
|
|
Data: &schemapb.ScalarField_LongData{
|
|
LongData: &schemapb.LongArray{
|
|
Data: []int64{12, 33, 24, 48, 11},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
results[0] = &segcorepb.RetrieveResults{
|
|
FieldsData: []*schemapb.FieldData{fieldData1, fieldData2},
|
|
}
|
|
}
|
|
{
|
|
fieldData1 := &schemapb.FieldData{
|
|
Type: schemapb.DataType_Int16,
|
|
Field: &schemapb.FieldData_Scalars{
|
|
Scalars: &schemapb.ScalarField{
|
|
Data: &schemapb.ScalarField_IntData{
|
|
IntData: &schemapb.IntArray{
|
|
Data: []int32{2, 3, 5, 9, 11},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
fieldData2 := &schemapb.FieldData{
|
|
Type: schemapb.DataType_Int64,
|
|
Field: &schemapb.FieldData_Scalars{
|
|
Scalars: &schemapb.ScalarField{
|
|
Data: &schemapb.ScalarField_LongData{
|
|
LongData: &schemapb.LongArray{
|
|
Data: []int64{12, 33, 15, 18, 11},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
results[1] = &segcorepb.RetrieveResults{
|
|
FieldsData: []*schemapb.FieldData{fieldData1, fieldData2},
|
|
}
|
|
}
|
|
|
|
reducedRes, err := aggReducer.Reduce(context.Background(), results, nil, nil)
|
|
s.NoError(err)
|
|
s.NotNil(reducedRes)
|
|
|
|
actualGroupsKeys := reducedRes.GetFieldsData()[0].GetScalars().GetIntData().GetData()
|
|
actualAggs := reducedRes.GetFieldsData()[1].GetScalars().GetLongData().GetData()
|
|
groupLen := len(actualGroupsKeys)
|
|
aggLen := len(actualAggs)
|
|
s.Equal(groupLen, aggLen)
|
|
expectGroupAggMap := map[int32]int64{2: 24, 3: 66, 4: 24, 8: 48, 11: 22, 5: 15, 9: 18}
|
|
s.Equal(groupLen, len(expectGroupAggMap))
|
|
|
|
for i := 0; i < groupLen; i++ {
|
|
groupKey := actualGroupsKeys[i]
|
|
actualAgg := actualAggs[i]
|
|
expectAggVal, exist := expectGroupAggMap[groupKey]
|
|
s.True(exist)
|
|
s.Equal(expectAggVal, actualAgg)
|
|
}
|
|
}
|
|
|
|
func (s *AggReduceSuite) TestSegCoreAggReduceMultiColumn() {
|
|
groupByFieldIds := make([]int64, 2)
|
|
groupByFieldIds[0] = 101
|
|
groupByFieldIds[1] = 102
|
|
aggregates := make([]*planpb.Aggregate, 1)
|
|
aggregates[0] = &planpb.Aggregate{
|
|
Op: planpb.AggregateOp_sum,
|
|
FieldId: 103,
|
|
}
|
|
|
|
// Create a minimal schema for validation
|
|
schema := &schemapb.CollectionSchema{
|
|
Name: "test_collection",
|
|
Fields: []*schemapb.FieldSchema{
|
|
{
|
|
FieldID: 101,
|
|
Name: "field101",
|
|
DataType: schemapb.DataType_Int16,
|
|
IsPrimaryKey: false,
|
|
},
|
|
{
|
|
FieldID: 102,
|
|
Name: "field102",
|
|
DataType: schemapb.DataType_VarChar,
|
|
IsPrimaryKey: false,
|
|
},
|
|
{
|
|
FieldID: 103,
|
|
Name: "field103",
|
|
DataType: schemapb.DataType_Int64,
|
|
IsPrimaryKey: false,
|
|
},
|
|
},
|
|
}
|
|
|
|
aggReducer := NewSegcoreAggReducer(groupByFieldIds, aggregates, 10, schema)
|
|
results := make([]*segcorepb.RetrieveResults, 2)
|
|
{
|
|
fieldData1 := &schemapb.FieldData{
|
|
Type: schemapb.DataType_Int16,
|
|
Field: &schemapb.FieldData_Scalars{
|
|
Scalars: &schemapb.ScalarField{
|
|
Data: &schemapb.ScalarField_IntData{
|
|
IntData: &schemapb.IntArray{
|
|
Data: []int32{2, 3, 4, 8, 11},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
fieldData2 := &schemapb.FieldData{
|
|
Type: schemapb.DataType_VarChar,
|
|
Field: &schemapb.FieldData_Scalars{
|
|
Scalars: &schemapb.ScalarField{
|
|
Data: &schemapb.ScalarField_StringData{
|
|
StringData: &schemapb.StringArray{
|
|
Data: []string{"a", "b", "c", "d", "e"},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
fieldData3 := &schemapb.FieldData{
|
|
Type: schemapb.DataType_Int64,
|
|
Field: &schemapb.FieldData_Scalars{
|
|
Scalars: &schemapb.ScalarField{
|
|
Data: &schemapb.ScalarField_LongData{
|
|
LongData: &schemapb.LongArray{
|
|
Data: []int64{12, 33, 24, 48, 11},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
results[0] = &segcorepb.RetrieveResults{
|
|
FieldsData: []*schemapb.FieldData{fieldData1, fieldData2, fieldData3},
|
|
}
|
|
}
|
|
{
|
|
fieldData1 := &schemapb.FieldData{
|
|
Type: schemapb.DataType_Int16,
|
|
Field: &schemapb.FieldData_Scalars{
|
|
Scalars: &schemapb.ScalarField{
|
|
Data: &schemapb.ScalarField_IntData{
|
|
IntData: &schemapb.IntArray{
|
|
Data: []int32{2, 3, 5, 9, 11},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
fieldData2 := &schemapb.FieldData{
|
|
Type: schemapb.DataType_VarChar,
|
|
Field: &schemapb.FieldData_Scalars{
|
|
Scalars: &schemapb.ScalarField{
|
|
Data: &schemapb.ScalarField_StringData{
|
|
StringData: &schemapb.StringArray{
|
|
Data: []string{"b", "c", "e", "f", "g"},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
fieldData3 := &schemapb.FieldData{
|
|
Type: schemapb.DataType_Int64,
|
|
Field: &schemapb.FieldData_Scalars{
|
|
Scalars: &schemapb.ScalarField{
|
|
Data: &schemapb.ScalarField_LongData{
|
|
LongData: &schemapb.LongArray{
|
|
Data: []int64{12, 33, 15, 18, 11},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
results[1] = &segcorepb.RetrieveResults{
|
|
FieldsData: []*schemapb.FieldData{fieldData1, fieldData2, fieldData3},
|
|
}
|
|
}
|
|
|
|
reducedRes, err := aggReducer.Reduce(context.Background(), results, nil, nil)
|
|
s.NoError(err)
|
|
s.NotNil(reducedRes)
|
|
log.Info("reduce:", zap.Any("reducedRes", reducedRes))
|
|
type Pair struct {
|
|
key1 int32
|
|
key2 string
|
|
}
|
|
expectedMap := map[Pair]int64{
|
|
{key1: 2, key2: "a"}: 12,
|
|
{key1: 3, key2: "b"}: 33,
|
|
{key1: 4, key2: "c"}: 24,
|
|
{key1: 8, key2: "d"}: 48,
|
|
{key1: 11, key2: "e"}: 11,
|
|
{key1: 2, key2: "b"}: 12,
|
|
{key1: 3, key2: "c"}: 33,
|
|
{key1: 5, key2: "e"}: 15,
|
|
{key1: 9, key2: "f"}: 18,
|
|
{key1: 11, key2: "g"}: 11,
|
|
}
|
|
|
|
actualGroupsKeys1 := reducedRes.GetFieldsData()[0].GetScalars().GetIntData().GetData()
|
|
actualGroupsKeys2 := reducedRes.GetFieldsData()[1].GetScalars().GetStringData().GetData()
|
|
actualAggs := reducedRes.GetFieldsData()[2].GetScalars().GetLongData().GetData()
|
|
groupLen := len(actualGroupsKeys1)
|
|
aggLen := len(actualAggs)
|
|
s.Equal(groupLen, aggLen)
|
|
s.Equal(groupLen, len(actualGroupsKeys2))
|
|
s.Equal(groupLen, len(expectedMap))
|
|
|
|
for i := 0; i < groupLen; i++ {
|
|
actualGroupKey1 := actualGroupsKeys1[i]
|
|
actualGroupKey2 := actualGroupsKeys2[i]
|
|
actualAgg := actualAggs[i]
|
|
keysPair := Pair{key1: actualGroupKey1, key2: actualGroupKey2}
|
|
expectAggVal, exist := expectedMap[keysPair]
|
|
s.True(exist)
|
|
s.Equal(expectAggVal, actualAgg)
|
|
}
|
|
}
|
|
|
|
func (s *AggReduceSuite) TestSegCoreAggReduceWrongRowCount() {
|
|
groupByFieldIds := make([]int64, 1)
|
|
groupByFieldIds[0] = 101
|
|
aggregates := make([]*planpb.Aggregate, 1)
|
|
aggregates[0] = &planpb.Aggregate{
|
|
Op: planpb.AggregateOp_sum,
|
|
FieldId: 102,
|
|
}
|
|
|
|
// Create a minimal schema for validation
|
|
schema := &schemapb.CollectionSchema{
|
|
Name: "test_collection",
|
|
Fields: []*schemapb.FieldSchema{
|
|
{
|
|
FieldID: 101,
|
|
Name: "field101",
|
|
DataType: schemapb.DataType_Int16,
|
|
IsPrimaryKey: false,
|
|
},
|
|
{
|
|
FieldID: 102,
|
|
Name: "field102",
|
|
DataType: schemapb.DataType_Int64,
|
|
IsPrimaryKey: false,
|
|
},
|
|
},
|
|
}
|
|
|
|
aggReducer := NewSegcoreAggReducer(groupByFieldIds, aggregates, 10, schema)
|
|
results := make([]*segcorepb.RetrieveResults, 2)
|
|
// should report error when
|
|
// field data's lengths are different
|
|
{
|
|
fieldData1 := &schemapb.FieldData{
|
|
Type: schemapb.DataType_Int16,
|
|
Field: &schemapb.FieldData_Scalars{
|
|
Scalars: &schemapb.ScalarField{
|
|
Data: &schemapb.ScalarField_IntData{
|
|
IntData: &schemapb.IntArray{
|
|
Data: []int32{2, 3, 4, 8, 11},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
fieldData2 := &schemapb.FieldData{
|
|
Type: schemapb.DataType_Int64,
|
|
Field: &schemapb.FieldData_Scalars{
|
|
Scalars: &schemapb.ScalarField{
|
|
Data: &schemapb.ScalarField_LongData{
|
|
LongData: &schemapb.LongArray{
|
|
Data: []int64{12, 33},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
results[0] = &segcorepb.RetrieveResults{
|
|
FieldsData: []*schemapb.FieldData{fieldData1, fieldData2},
|
|
}
|
|
}
|
|
{
|
|
fieldData1 := &schemapb.FieldData{
|
|
Type: schemapb.DataType_Int16,
|
|
Field: &schemapb.FieldData_Scalars{
|
|
Scalars: &schemapb.ScalarField{
|
|
Data: &schemapb.ScalarField_IntData{
|
|
IntData: &schemapb.IntArray{
|
|
Data: []int32{2, 3, 5, 9, 11},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
fieldData2 := &schemapb.FieldData{
|
|
Type: schemapb.DataType_Int64,
|
|
Field: &schemapb.FieldData_Scalars{
|
|
Scalars: &schemapb.ScalarField{
|
|
Data: &schemapb.ScalarField_LongData{
|
|
LongData: &schemapb.LongArray{
|
|
Data: []int64{12},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
results[1] = &segcorepb.RetrieveResults{
|
|
FieldsData: []*schemapb.FieldData{fieldData1, fieldData2},
|
|
}
|
|
}
|
|
|
|
reducedRes, err := aggReducer.Reduce(context.Background(), results, nil, nil)
|
|
s.Error(err)
|
|
s.Nil(reducedRes)
|
|
log.Info("err:", zap.Any("err", err))
|
|
}
|
|
|
|
func (s *AggReduceSuite) TestSegCoreAggReduceNilResult() {
|
|
groupByFieldIds := make([]int64, 1)
|
|
groupByFieldIds[0] = 101
|
|
aggregates := make([]*planpb.Aggregate, 1)
|
|
aggregates[0] = &planpb.Aggregate{
|
|
Op: planpb.AggregateOp_sum,
|
|
FieldId: 102,
|
|
}
|
|
|
|
// Create a minimal schema for validation
|
|
schema := &schemapb.CollectionSchema{
|
|
Name: "test_collection",
|
|
Fields: []*schemapb.FieldSchema{
|
|
{
|
|
FieldID: 101,
|
|
Name: "field101",
|
|
DataType: schemapb.DataType_Int16,
|
|
IsPrimaryKey: false,
|
|
},
|
|
{
|
|
FieldID: 102,
|
|
Name: "field102",
|
|
DataType: schemapb.DataType_Int64,
|
|
IsPrimaryKey: false,
|
|
},
|
|
},
|
|
}
|
|
|
|
aggReducer := NewSegcoreAggReducer(groupByFieldIds, aggregates, 10, schema)
|
|
results := make([]*segcorepb.RetrieveResults, 2)
|
|
{
|
|
fieldData1 := &schemapb.FieldData{
|
|
Type: schemapb.DataType_Int16,
|
|
Field: &schemapb.FieldData_Scalars{
|
|
Scalars: &schemapb.ScalarField{
|
|
Data: &schemapb.ScalarField_IntData{
|
|
IntData: &schemapb.IntArray{
|
|
Data: []int32{2, 3, 4, 8, 11},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
fieldData2 := &schemapb.FieldData{
|
|
Type: schemapb.DataType_Int64,
|
|
Field: &schemapb.FieldData_Scalars{
|
|
Scalars: &schemapb.ScalarField{
|
|
Data: &schemapb.ScalarField_LongData{
|
|
LongData: &schemapb.LongArray{
|
|
Data: []int64{12, 33, 24, 24, 33},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
results[0] = &segcorepb.RetrieveResults{
|
|
FieldsData: []*schemapb.FieldData{fieldData1, fieldData2},
|
|
}
|
|
}
|
|
results[1] = nil
|
|
|
|
reducedRes, err := aggReducer.Reduce(context.Background(), results, nil, nil)
|
|
log.Info("err:", zap.Any("err", err))
|
|
s.Error(err)
|
|
s.Nil(reducedRes)
|
|
}
|
|
|
|
func (s *AggReduceSuite) TestSegCoreAggReduceInnerNil() {
|
|
groupByFieldIds := make([]int64, 1)
|
|
groupByFieldIds[0] = 101
|
|
aggregates := make([]*planpb.Aggregate, 1)
|
|
aggregates[0] = &planpb.Aggregate{
|
|
Op: planpb.AggregateOp_sum,
|
|
FieldId: 102,
|
|
}
|
|
|
|
// Create a minimal schema for validation
|
|
schema := &schemapb.CollectionSchema{
|
|
Name: "test_collection",
|
|
Fields: []*schemapb.FieldSchema{
|
|
{
|
|
FieldID: 101,
|
|
Name: "field101",
|
|
DataType: schemapb.DataType_Int16,
|
|
IsPrimaryKey: false,
|
|
},
|
|
{
|
|
FieldID: 102,
|
|
Name: "field102",
|
|
DataType: schemapb.DataType_Int64,
|
|
IsPrimaryKey: false,
|
|
},
|
|
},
|
|
}
|
|
|
|
aggReducer := NewSegcoreAggReducer(groupByFieldIds, aggregates, 10, schema)
|
|
results := make([]*segcorepb.RetrieveResults, 2)
|
|
{
|
|
fieldData1 := &schemapb.FieldData{
|
|
Type: schemapb.DataType_Int16,
|
|
Field: &schemapb.FieldData_Scalars{
|
|
Scalars: &schemapb.ScalarField{
|
|
Data: &schemapb.ScalarField_IntData{
|
|
IntData: &schemapb.IntArray{
|
|
Data: []int32{2, 3, 4, 8, 11},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
fieldData2 := &schemapb.FieldData{
|
|
Type: schemapb.DataType_Int64,
|
|
Field: &schemapb.FieldData_Scalars{
|
|
Scalars: &schemapb.ScalarField{
|
|
Data: &schemapb.ScalarField_LongData{
|
|
LongData: &schemapb.LongArray{
|
|
Data: nil,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
results[0] = &segcorepb.RetrieveResults{
|
|
FieldsData: []*schemapb.FieldData{fieldData1, fieldData2},
|
|
}
|
|
}
|
|
{
|
|
fieldData1 := &schemapb.FieldData{
|
|
Type: schemapb.DataType_Int16,
|
|
Field: &schemapb.FieldData_Scalars{
|
|
Scalars: &schemapb.ScalarField{
|
|
Data: &schemapb.ScalarField_IntData{
|
|
IntData: &schemapb.IntArray{
|
|
Data: []int32{2, 3, 5, 9, 11},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
fieldData2 := &schemapb.FieldData{
|
|
Type: schemapb.DataType_Int64,
|
|
Field: &schemapb.FieldData_Scalars{
|
|
Scalars: &schemapb.ScalarField{
|
|
Data: &schemapb.ScalarField_LongData{
|
|
LongData: &schemapb.LongArray{
|
|
Data: []int64{12, 33, 15, 18, 11},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
results[1] = &segcorepb.RetrieveResults{
|
|
FieldsData: []*schemapb.FieldData{fieldData1, fieldData2},
|
|
}
|
|
}
|
|
reducedRes, err := aggReducer.Reduce(context.Background(), results, nil, nil)
|
|
log.Info("err:", zap.Any("err", err))
|
|
s.Error(err)
|
|
s.Nil(reducedRes)
|
|
}
|
|
|
|
func (s *AggReduceSuite) TestSegCoreAggReduceGlobalAgg() {
|
|
groupByFieldIds := make([]int64, 0)
|
|
aggregates := make([]*planpb.Aggregate, 2)
|
|
aggregates[0] = &planpb.Aggregate{
|
|
Op: planpb.AggregateOp_sum,
|
|
FieldId: 102,
|
|
}
|
|
aggregates[1] = &planpb.Aggregate{
|
|
Op: planpb.AggregateOp_sum,
|
|
FieldId: 103,
|
|
}
|
|
|
|
// Create a minimal schema for validation
|
|
schema := &schemapb.CollectionSchema{
|
|
Name: "test_collection",
|
|
Fields: []*schemapb.FieldSchema{
|
|
{
|
|
FieldID: 102,
|
|
Name: "field102",
|
|
DataType: schemapb.DataType_Int64,
|
|
IsPrimaryKey: false,
|
|
},
|
|
{
|
|
FieldID: 103,
|
|
Name: "field103",
|
|
DataType: schemapb.DataType_Int64,
|
|
IsPrimaryKey: false,
|
|
},
|
|
},
|
|
}
|
|
|
|
aggReducer := NewSegcoreAggReducer(groupByFieldIds, aggregates, 10, schema)
|
|
results := make([]*segcorepb.RetrieveResults, 2)
|
|
{
|
|
fieldData1 := &schemapb.FieldData{
|
|
Type: schemapb.DataType_Int64,
|
|
Field: &schemapb.FieldData_Scalars{
|
|
Scalars: &schemapb.ScalarField{
|
|
Data: &schemapb.ScalarField_LongData{
|
|
LongData: &schemapb.LongArray{
|
|
Data: []int64{40},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
fieldData2 := &schemapb.FieldData{
|
|
Type: schemapb.DataType_Int64,
|
|
Field: &schemapb.FieldData_Scalars{
|
|
Scalars: &schemapb.ScalarField{
|
|
Data: &schemapb.ScalarField_LongData{
|
|
LongData: &schemapb.LongArray{
|
|
Data: []int64{120},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
results[0] = &segcorepb.RetrieveResults{
|
|
FieldsData: []*schemapb.FieldData{fieldData1, fieldData2},
|
|
}
|
|
}
|
|
{
|
|
fieldData1 := &schemapb.FieldData{
|
|
Type: schemapb.DataType_Int64,
|
|
Field: &schemapb.FieldData_Scalars{
|
|
Scalars: &schemapb.ScalarField{
|
|
Data: &schemapb.ScalarField_LongData{
|
|
LongData: &schemapb.LongArray{
|
|
Data: []int64{420},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
fieldData2 := &schemapb.FieldData{
|
|
Type: schemapb.DataType_Int64,
|
|
Field: &schemapb.FieldData_Scalars{
|
|
Scalars: &schemapb.ScalarField{
|
|
Data: &schemapb.ScalarField_LongData{
|
|
LongData: &schemapb.LongArray{
|
|
Data: []int64{130},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
results[1] = &segcorepb.RetrieveResults{
|
|
FieldsData: []*schemapb.FieldData{fieldData1, fieldData2},
|
|
}
|
|
}
|
|
reducedRes, err := aggReducer.Reduce(context.Background(), results, nil, nil)
|
|
s.NoError(err)
|
|
s.NotNil(reducedRes)
|
|
s.Equal(2, len(reducedRes.GetFieldsData()))
|
|
s.Equal(1, len(reducedRes.GetFieldsData()[0].GetScalars().GetLongData().GetData()))
|
|
s.Equal(1, len(reducedRes.GetFieldsData()[1].GetScalars().GetLongData().GetData()))
|
|
s.Equal(int64(460), reducedRes.GetFieldsData()[0].GetScalars().GetLongData().GetData()[0])
|
|
s.Equal(int64(250), reducedRes.GetFieldsData()[1].GetScalars().GetLongData().GetData()[0])
|
|
}
|
|
|
|
func TestAggReduce(t *testing.T) {
|
|
suite.Run(t, new(AggReduceSuite))
|
|
}
|