enhance: [GoSDK] Handle dynamic column & partial load output (#38258)

Related to #37853

Previous logic cannot handle partial load due to dynamic column
handling. This PR unifies the output translate logic.

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2024-12-06 16:46:41 +08:00 committed by GitHub
parent 8ed019735c
commit aa4eb2f6df
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -20,6 +20,7 @@ import (
"context" "context"
"github.com/cockroachdb/errors" "github.com/cockroachdb/errors"
"github.com/samber/lo"
"google.golang.org/grpc" "google.golang.org/grpc"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
@ -27,6 +28,7 @@ import (
"github.com/milvus-io/milvus/client/v2/column" "github.com/milvus-io/milvus/client/v2/column"
"github.com/milvus-io/milvus/client/v2/entity" "github.com/milvus-io/milvus/client/v2/entity"
"github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/typeutil"
) )
func (c *Client) Search(ctx context.Context, option SearchOption, callOptions ...grpc.CallOption) ([]ResultSet, error) { func (c *Client) Search(ctx context.Context, option SearchOption, callOptions ...grpc.CallOption) ([]ResultSet, error) {
@ -92,13 +94,24 @@ func (c *Client) handleSearchResult(schema *entity.Schema, outputFields []string
func (c *Client) parseSearchResult(sch *entity.Schema, outputFields []string, fieldDataList []*schemapb.FieldData, _, from, to int) ([]column.Column, error) { func (c *Client) parseSearchResult(sch *entity.Schema, outputFields []string, fieldDataList []*schemapb.FieldData, _, from, to int) ([]column.Column, error) {
var wildcard bool var wildcard bool
// serveral cases shall be handled here
// 1. output fields contains "*" wildcard => the schema shall be checked
// 2. dynamic schema $meta column, with field name not exist in schema
// 3. explicitly specified json column name
// 4. partial load field
// translate "*" into possible field names
// if partial load enabled, result set could miss some column
outputFields, wildcard = expandWildcard(sch, outputFields) outputFields, wildcard = expandWildcard(sch, outputFields)
// duplicated name will have only one column now // duplicated field name will be merged into one column
outputSet := make(map[string]struct{}) outputSet := typeutil.NewSet(outputFields...)
for _, output := range outputFields {
outputSet[output] = struct{}{} // setup schema valid field name to get possible dynamic field name
} schemaFieldSet := typeutil.NewSet(lo.Map(sch.Fields, func(f *entity.Field, _ int) string {
// fields := make(map[string]*schemapb.FieldData) return f.Name
})...)
dynamicNames := outputSet.Complement(schemaFieldSet)
columns := make([]column.Column, 0, len(outputFields)) columns := make([]column.Column, 0, len(outputFields))
var dynamicColumn *column.ColumnJSONBytes var dynamicColumn *column.ColumnJSONBytes
for _, fieldData := range fieldDataList { for _, fieldData := range fieldDataList {
@ -106,6 +119,8 @@ func (c *Client) parseSearchResult(sch *entity.Schema, outputFields []string, fi
if err != nil { if err != nil {
return nil, err return nil, err
} }
// if output data contains dynamic json, setup dynamicColumn
if fieldData.GetIsDynamic() { if fieldData.GetIsDynamic() {
var ok bool var ok bool
dynamicColumn, ok = col.(*column.ColumnJSONBytes) dynamicColumn, ok = col.(*column.ColumnJSONBytes)
@ -119,21 +134,22 @@ func (c *Client) parseSearchResult(sch *entity.Schema, outputFields []string, fi
} }
} }
// remove processed field // remove processed field, remove from possible dynamic set
delete(outputSet, fieldData.GetFieldName()) delete(dynamicNames, fieldData.GetFieldName())
columns = append(columns, col) columns = append(columns, col)
} }
if len(outputSet) > 0 && dynamicColumn == nil { // extra name found and not json output
if len(dynamicNames) > 0 && dynamicColumn == nil {
var extraFields []string var extraFields []string
for output := range outputSet { for output := range dynamicNames {
extraFields = append(extraFields, output) extraFields = append(extraFields, output)
} }
return nil, errors.Newf("extra output fields %v found and result does not dynamic field", extraFields) return nil, errors.Newf("extra output fields %v found and result does not contain dynamic field", extraFields)
} }
// add dynamic column for extra fields // add dynamic column for extra fields
for outputField := range outputSet { for outputField := range dynamicNames {
column := column.NewColumnDynamic(dynamicColumn, outputField) column := column.NewColumnDynamic(dynamicColumn, outputField)
columns = append(columns, column) columns = append(columns, column)
} }