From 8a6e1a4b277c77d3f0d9e678a95adc00c1f6ba7c Mon Sep 17 00:00:00 2001 From: congqixia Date: Tue, 9 Jan 2024 15:48:55 +0800 Subject: [PATCH] enhance: pre-allocate result FieldData space to reduce copy & growslice (#29726) See also: #29113 Add a new utitliy function in `pkg/util/typetuil` to pre-allocate field data slice capacity acoording to search limit. This shall avoid copying the data during `AppendFieldData` when previous slice is out of space. And shall also save CPU time during high paylog. --------- Signed-off-by: Congqi Xia --- internal/proxy/task_search.go | 6 +- pkg/util/typeutil/schema.go | 100 ++++++ pkg/util/typeutil/schema_test.go | 303 +++++++++++++++++++ tests/integration/jsonexpr/json_expr_test.go | 32 +- 4 files changed, 422 insertions(+), 19 deletions(-) diff --git a/internal/proxy/task_search.go b/internal/proxy/task_search.go index 8f1c3fd203..88d1019882 100644 --- a/internal/proxy/task_search.go +++ b/internal/proxy/task_search.go @@ -850,7 +850,7 @@ func reduceSearchResultData(ctx context.Context, subSearchResultData []*schemapb Results: &schemapb.SearchResultData{ NumQueries: nq, TopK: topk, - FieldsData: make([]*schemapb.FieldData, len(subSearchResultData[0].FieldsData)), + FieldsData: typeutil.PrepareResultFieldData(subSearchResultData[0].GetFieldsData(), limit), Scores: []float32{}, Ids: &schemapb.IDs{}, Topks: []int64{}, @@ -861,13 +861,13 @@ func reduceSearchResultData(ctx context.Context, subSearchResultData []*schemapb case schemapb.DataType_Int64: ret.GetResults().Ids.IdField = &schemapb.IDs_IntId{ IntId: &schemapb.LongArray{ - Data: make([]int64, 0), + Data: make([]int64, 0, limit), }, } case schemapb.DataType_VarChar: ret.GetResults().Ids.IdField = &schemapb.IDs_StrId{ StrId: &schemapb.StringArray{ - Data: make([]string, 0), + Data: make([]string, 0, limit), }, } default: diff --git a/pkg/util/typeutil/schema.go b/pkg/util/typeutil/schema.go index 2e816d2b4a..4a25b400b1 100644 --- a/pkg/util/typeutil/schema.go +++ b/pkg/util/typeutil/schema.go @@ -383,6 +383,106 @@ func IsVariableDataType(dataType schemapb.DataType) bool { return IsStringType(dataType) || IsArrayType(dataType) || IsJSONType(dataType) } +// PrepareResultFieldData construct this slice fo FieldData for final result reduce +// this shall preallocate the space for field data internal slice prevent slice growing cost. +func PrepareResultFieldData(sample []*schemapb.FieldData, topK int64) []*schemapb.FieldData { + result := make([]*schemapb.FieldData, 0, len(sample)) + for _, fieldData := range sample { + fd := &schemapb.FieldData{ + Type: fieldData.Type, + FieldName: fieldData.FieldName, + FieldId: fieldData.FieldId, + IsDynamic: fieldData.IsDynamic, + } + switch fieldType := fieldData.Field.(type) { + case *schemapb.FieldData_Scalars: + scalarField := fieldData.GetScalars() + scalar := &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{}, + } + switch fieldType.Scalars.Data.(type) { + case *schemapb.ScalarField_BoolData: + scalar.Scalars.Data = &schemapb.ScalarField_BoolData{ + BoolData: &schemapb.BoolArray{ + Data: make([]bool, 0, topK), + }, + } + case *schemapb.ScalarField_IntData: + scalar.Scalars.Data = &schemapb.ScalarField_IntData{ + IntData: &schemapb.IntArray{ + Data: make([]int32, 0, topK), + }, + } + case *schemapb.ScalarField_LongData: + scalar.Scalars.Data = &schemapb.ScalarField_LongData{ + LongData: &schemapb.LongArray{ + Data: make([]int64, 0, topK), + }, + } + case *schemapb.ScalarField_FloatData: + scalar.Scalars.Data = &schemapb.ScalarField_FloatData{ + FloatData: &schemapb.FloatArray{ + Data: make([]float32, 0, topK), + }, + } + case *schemapb.ScalarField_DoubleData: + scalar.Scalars.Data = &schemapb.ScalarField_DoubleData{ + DoubleData: &schemapb.DoubleArray{ + Data: make([]float64, 0, topK), + }, + } + case *schemapb.ScalarField_StringData: + scalar.Scalars.Data = &schemapb.ScalarField_StringData{ + StringData: &schemapb.StringArray{ + Data: make([]string, 0, topK), + }, + } + case *schemapb.ScalarField_JsonData: + scalar.Scalars.Data = &schemapb.ScalarField_JsonData{ + JsonData: &schemapb.JSONArray{ + Data: make([][]byte, 0, topK), + }, + } + case *schemapb.ScalarField_ArrayData: + scalar.Scalars.Data = &schemapb.ScalarField_ArrayData{ + ArrayData: &schemapb.ArrayArray{ + Data: make([]*schemapb.ScalarField, 0, topK), + ElementType: scalarField.GetArrayData().GetElementType(), + }, + } + } + fd.Field = scalar + case *schemapb.FieldData_Vectors: + vectorField := fieldData.GetVectors() + dim := vectorField.GetDim() + vectors := &schemapb.FieldData_Vectors{ + Vectors: &schemapb.VectorField{ + Dim: dim, + }, + } + switch fieldType.Vectors.Data.(type) { + case *schemapb.VectorField_FloatVector: + vectors.Vectors.Data = &schemapb.VectorField_FloatVector{ + FloatVector: &schemapb.FloatArray{ + Data: make([]float32, 0, dim*topK), + }, + } + case *schemapb.VectorField_Float16Vector: + vectors.Vectors.Data = &schemapb.VectorField_Float16Vector{ + Float16Vector: make([]byte, 0, topK*dim*2), + } + case *schemapb.VectorField_BinaryVector: + vectors.Vectors.Data = &schemapb.VectorField_BinaryVector{ + BinaryVector: make([]byte, 0, topK*dim/8), + } + } + fd.Field = vectors + } + result = append(result, fd) + } + return result +} + // AppendFieldData appends fields data of specified index from src to dst func AppendFieldData(dst []*schemapb.FieldData, src []*schemapb.FieldData, idx int64) (appendSize int64) { for i, fieldData := range src { diff --git a/pkg/util/typeutil/schema_test.go b/pkg/util/typeutil/schema_test.go index 5a209c00d1..81965a5dc7 100644 --- a/pkg/util/typeutil/schema_test.go +++ b/pkg/util/typeutil/schema_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -1323,3 +1324,305 @@ func TestMergeFieldData(t *testing.T) { assert.Error(t, err) }) } + +type FieldDataSuite struct { + suite.Suite +} + +func (s *FieldDataSuite) TestPrepareFieldData() { + fieldID := int64(100) + fieldName := "testField" + topK := int64(100) + + s.Run("bool", func() { + samples := []*schemapb.FieldData{ + { + FieldId: fieldID, + FieldName: fieldName, + Type: schemapb.DataType_Bool, + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_BoolData{}, + }, + }, + }, + } + + fields := PrepareResultFieldData(samples, topK) + s.Require().Len(fields, 1) + field := fields[0] + s.Equal(fieldID, field.GetFieldId()) + s.Equal(fieldName, field.GetFieldName()) + s.Equal(schemapb.DataType_Bool, field.GetType()) + + s.EqualValues(topK, cap(field.GetScalars().GetBoolData().GetData())) + }) + + s.Run("int", func() { + dataTypes := []schemapb.DataType{ + schemapb.DataType_Int32, + schemapb.DataType_Int16, + schemapb.DataType_Int8, + } + for _, dataType := range dataTypes { + samples := []*schemapb.FieldData{ + { + FieldId: fieldID, + FieldName: fieldName, + Type: dataType, + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_IntData{}, + }, + }, + }, + } + + fields := PrepareResultFieldData(samples, topK) + s.Require().Len(fields, 1) + field := fields[0] + s.Equal(fieldID, field.GetFieldId()) + s.Equal(fieldName, field.GetFieldName()) + s.Equal(dataType, field.GetType()) + + s.EqualValues(topK, cap(field.GetScalars().GetIntData().GetData())) + } + }) + + s.Run("long", func() { + samples := []*schemapb.FieldData{ + { + FieldId: fieldID, + FieldName: fieldName, + Type: schemapb.DataType_Int64, + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_LongData{}, + }, + }, + }, + } + + fields := PrepareResultFieldData(samples, topK) + s.Require().Len(fields, 1) + field := fields[0] + s.Equal(fieldID, field.GetFieldId()) + s.Equal(fieldName, field.GetFieldName()) + s.Equal(schemapb.DataType_Int64, field.GetType()) + + s.EqualValues(topK, cap(field.GetScalars().GetLongData().GetData())) + }) + + s.Run("float", func() { + samples := []*schemapb.FieldData{ + { + FieldId: fieldID, + FieldName: fieldName, + Type: schemapb.DataType_Float, + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_FloatData{}, + }, + }, + }, + } + + fields := PrepareResultFieldData(samples, topK) + s.Require().Len(fields, 1) + field := fields[0] + s.Equal(fieldID, field.GetFieldId()) + s.Equal(fieldName, field.GetFieldName()) + s.Equal(schemapb.DataType_Float, field.GetType()) + + s.EqualValues(topK, cap(field.GetScalars().GetFloatData().GetData())) + }) + + s.Run("double", func() { + samples := []*schemapb.FieldData{ + { + FieldId: fieldID, + FieldName: fieldName, + Type: schemapb.DataType_Double, + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_DoubleData{}, + }, + }, + }, + } + + fields := PrepareResultFieldData(samples, topK) + s.Require().Len(fields, 1) + field := fields[0] + s.Equal(fieldID, field.GetFieldId()) + s.Equal(fieldName, field.GetFieldName()) + s.Equal(schemapb.DataType_Double, field.GetType()) + + s.EqualValues(topK, cap(field.GetScalars().GetDoubleData().GetData())) + }) + + s.Run("string", func() { + dataTypes := []schemapb.DataType{ + schemapb.DataType_VarChar, + schemapb.DataType_String, + } + for _, dataType := range dataTypes { + samples := []*schemapb.FieldData{ + { + FieldId: fieldID, + FieldName: fieldName, + Type: dataType, + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_StringData{}, + }, + }, + }, + } + + fields := PrepareResultFieldData(samples, topK) + s.Require().Len(fields, 1) + field := fields[0] + s.Equal(fieldID, field.GetFieldId()) + s.Equal(fieldName, field.GetFieldName()) + s.Equal(dataType, field.GetType()) + + s.EqualValues(topK, cap(field.GetScalars().GetStringData().GetData())) + } + }) + + s.Run("json", func() { + samples := []*schemapb.FieldData{ + { + FieldId: fieldID, + FieldName: fieldName, + Type: schemapb.DataType_JSON, + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_JsonData{}, + }, + }, + }, + } + + fields := PrepareResultFieldData(samples, topK) + s.Require().Len(fields, 1) + field := fields[0] + s.Equal(fieldID, field.GetFieldId()) + s.Equal(fieldName, field.GetFieldName()) + s.Equal(schemapb.DataType_JSON, field.GetType()) + + s.EqualValues(topK, cap(field.GetScalars().GetJsonData().GetData())) + }) + + s.Run("array", func() { + samples := []*schemapb.FieldData{ + { + FieldId: fieldID, + FieldName: fieldName, + Type: schemapb.DataType_Array, + Field: &schemapb.FieldData_Scalars{ + Scalars: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_ArrayData{ + ArrayData: &schemapb.ArrayArray{ + ElementType: schemapb.DataType_Bool, + }, + }, + }, + }, + }, + } + + fields := PrepareResultFieldData(samples, topK) + s.Require().Len(fields, 1) + field := fields[0] + s.Equal(fieldID, field.GetFieldId()) + s.Equal(fieldName, field.GetFieldName()) + s.Equal(schemapb.DataType_Array, field.GetType()) + + s.EqualValues(topK, cap(field.GetScalars().GetArrayData().GetData())) + s.Equal(schemapb.DataType_Bool, field.GetScalars().GetArrayData().GetElementType()) + }) + + s.Run("float_vector", func() { + samples := []*schemapb.FieldData{ + { + FieldId: fieldID, + FieldName: fieldName, + Type: schemapb.DataType_FloatVector, + Field: &schemapb.FieldData_Vectors{ + Vectors: &schemapb.VectorField{ + Dim: 128, + Data: &schemapb.VectorField_FloatVector{}, + }, + }, + }, + } + + fields := PrepareResultFieldData(samples, topK) + s.Require().Len(fields, 1) + field := fields[0] + s.Equal(fieldID, field.GetFieldId()) + s.Equal(fieldName, field.GetFieldName()) + s.Equal(schemapb.DataType_FloatVector, field.GetType()) + + s.EqualValues(128, field.GetVectors().GetDim()) + s.EqualValues(topK*128, cap(field.GetVectors().GetFloatVector().GetData())) + }) + + s.Run("float16_vector", func() { + samples := []*schemapb.FieldData{ + { + FieldId: fieldID, + FieldName: fieldName, + Type: schemapb.DataType_Float16Vector, + Field: &schemapb.FieldData_Vectors{ + Vectors: &schemapb.VectorField{ + Dim: 128, + Data: &schemapb.VectorField_Float16Vector{}, + }, + }, + }, + } + + fields := PrepareResultFieldData(samples, topK) + s.Require().Len(fields, 1) + field := fields[0] + s.Equal(fieldID, field.GetFieldId()) + s.Equal(fieldName, field.GetFieldName()) + s.Equal(schemapb.DataType_Float16Vector, field.GetType()) + + s.EqualValues(128, field.GetVectors().GetDim()) + s.EqualValues(topK*128*2, cap(field.GetVectors().GetFloat16Vector())) + }) + + s.Run("binary_vector", func() { + samples := []*schemapb.FieldData{ + { + FieldId: fieldID, + FieldName: fieldName, + Type: schemapb.DataType_BinaryVector, + Field: &schemapb.FieldData_Vectors{ + Vectors: &schemapb.VectorField{ + Dim: 128, + Data: &schemapb.VectorField_BinaryVector{}, + }, + }, + }, + } + + fields := PrepareResultFieldData(samples, topK) + s.Require().Len(fields, 1) + field := fields[0] + s.Equal(fieldID, field.GetFieldId()) + s.Equal(fieldName, field.GetFieldName()) + s.Equal(schemapb.DataType_BinaryVector, field.GetType()) + + s.EqualValues(128, field.GetVectors().GetDim()) + s.EqualValues(topK*128/8, cap(field.GetVectors().GetBinaryVector())) + }) +} + +func TestFieldData(t *testing.T) { + suite.Run(t, new(FieldDataSuite)) +} diff --git a/tests/integration/jsonexpr/json_expr_test.go b/tests/integration/jsonexpr/json_expr_test.go index c5b397c08e..b5c3a196c0 100644 --- a/tests/integration/jsonexpr/json_expr_test.go +++ b/tests/integration/jsonexpr/json_expr_test.go @@ -206,8 +206,8 @@ func (s *JSONExprSuite) TestJSON_InsertWithoutDynamicData() { // search expr = `$meta["A"] > 90` checkFunc := func(result *milvuspb.SearchResults) { - for _, f := range result.Results.GetFieldsData() { - s.Nil(f) + for _, topk := range result.GetResults().GetTopks() { + s.Zero(topk) } } s.doSearch(collectionName, []string{common.MetaFieldName}, expr, dim, checkFunc) @@ -564,8 +564,8 @@ func (s *JSONExprSuite) checkSearch(collectionName, fieldName string, dim int) { expr = `exists AAA` checkFunc = func(result *milvuspb.SearchResults) { - for _, f := range result.Results.GetFieldsData() { - s.Nil(f) + for _, topk := range result.GetResults().GetTopks() { + s.Zero(topk) } } s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc) @@ -613,8 +613,8 @@ func (s *JSONExprSuite) checkSearch(collectionName, fieldName string, dim int) { expr = `A like "10"` checkFunc = func(result *milvuspb.SearchResults) { - for _, f := range result.Results.GetFieldsData() { - s.Nil(f) + for _, topk := range result.GetResults().GetTopks() { + s.Zero(topk) } } s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc) @@ -632,8 +632,8 @@ func (s *JSONExprSuite) checkSearch(collectionName, fieldName string, dim int) { expr = `str1 like 'abc"def-%'` checkFunc = func(result *milvuspb.SearchResults) { - for _, f := range result.Results.GetFieldsData() { - s.Nil(f) + for _, topk := range result.GetResults().GetTopks() { + s.Zero(topk) } } s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc) @@ -641,8 +641,8 @@ func (s *JSONExprSuite) checkSearch(collectionName, fieldName string, dim int) { expr = `str2 like 'abc\\"def-%'` checkFunc = func(result *milvuspb.SearchResults) { - for _, f := range result.Results.GetFieldsData() { - s.Nil(f) + for _, topk := range result.GetResults().GetTopks() { + s.Zero(topk) } } s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc) @@ -660,8 +660,8 @@ func (s *JSONExprSuite) checkSearch(collectionName, fieldName string, dim int) { expr = `A in []` checkFunc = func(result *milvuspb.SearchResults) { - for _, f := range result.Results.GetFieldsData() { - s.Nil(f) + for _, topk := range result.GetResults().GetTopks() { + s.Zero(topk) } } s.doSearch(collectionName, []string{fieldName}, expr, dim, checkFunc) @@ -1116,8 +1116,8 @@ func (s *JSONExprSuite) TestJsonContains() { expr = `json_contains_all(C, [0, 99])` checkFunc = func(result *milvuspb.SearchResults) { - for _, f := range result.Results.GetFieldsData() { - s.Nil(f) + for _, topk := range result.GetResults().GetTopks() { + s.Zero(topk) } } s.doSearch(collectionName, []string{"A"}, expr, dim, checkFunc) @@ -1133,8 +1133,8 @@ func (s *JSONExprSuite) TestJsonContains() { expr = `json_contains_any(C, [101, 102])` checkFunc = func(result *milvuspb.SearchResults) { - for _, f := range result.Results.GetFieldsData() { - s.Nil(f) + for _, topk := range result.GetResults().GetTopks() { + s.Zero(topk) } } s.doSearch(collectionName, []string{"A"}, expr, dim, checkFunc)