diff --git a/internal/proxy/highlighter.go b/internal/proxy/highlighter.go index c613a393c5..4262579a31 100644 --- a/internal/proxy/highlighter.go +++ b/internal/proxy/highlighter.go @@ -17,7 +17,6 @@ import ( "github.com/milvus-io/milvus/pkg/v2/proto/querypb" "github.com/milvus-io/milvus/pkg/v2/util/funcutil" "github.com/milvus-io/milvus/pkg/v2/util/merr" - "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) const ( @@ -53,7 +52,8 @@ type highlightQuery struct { } type LexicalHighlighter struct { - tasks map[int64]*highlightTask // fieldID -> highlightTask + tasks map[int64]*highlightTask // fieldID -> highlightTask + extraFields []int64 // extra fields id for fetch analyzer name of multi analyzers // option for all highlight task // TODO: support set option for each task preTags [][]byte @@ -65,7 +65,7 @@ type LexicalHighlighter struct { // add highlight task with search // must used before addTaskWithQuery -func (h *LexicalHighlighter) addTaskWithSearchText(fieldID int64, fieldName string, analyzerName string, texts []string) error { +func (h *LexicalHighlighter) addTaskWithSearchText(collInfo *schemaInfo, fieldID int64, fieldName string, analyzerName string, texts []string) error { _, ok := h.tasks[fieldID] if ok { return merr.WrapErrParameterInvalidMsg("not support hybrid search with highlight now. fieldID: %d", fieldID) @@ -84,11 +84,24 @@ func (h *LexicalHighlighter) addTaskWithSearchText(fieldID int64, fieldName stri task.Texts = texts task.SearchTextNum = int64(len(texts)) - if analyzerName != "" { + + // try get multi analyzer name field id + nameFieldID, err := collInfo.GetMultiAnalyzerNameFieldID(fieldID) + if err != nil { + return err + } + // set analyzer name and extra field id for multi analyzer + if nameFieldID > 0 { + // if multi analyzer name field id is found, set analyzer name to default + if analyzerName == "" { + analyzerName = "default" + } + task.AnalyzerNames = []string{} for i := 0; i < len(texts); i++ { task.AnalyzerNames = append(task.AnalyzerNames, analyzerName) } + h.extraFields = append(h.extraFields, nameFieldID) } return nil } @@ -132,14 +145,15 @@ func (h *LexicalHighlighter) AsSearchPipelineOperator(t *searchTask) (operator, } func (h *LexicalHighlighter) FieldIDs() []int64 { - return lo.Keys(h.tasks) + return append(lo.Keys(h.tasks), h.extraFields...) } func NewLexicalHighlighter(highlighter *commonpb.Highlighter) (*LexicalHighlighter, error) { params := funcutil.KeyValuePair2Map(highlighter.GetParams()) h := &LexicalHighlighter{ - tasks: make(map[int64]*highlightTask), - options: &querypb.HighlightOptions{}, + tasks: make(map[int64]*highlightTask), + options: &querypb.HighlightOptions{}, + extraFields: make([]int64, 0), } // set pre_tags and post_tags @@ -271,10 +285,10 @@ func NewLexicalHighlighter(highlighter *commonpb.Highlighter) (*LexicalHighlight } type lexicalHighlightOperator struct { - tasks []*highlightTask - fieldSchemas []*schemapb.FieldSchema - lbPolicy shardclient.LBPolicy - scheduler *taskScheduler + tasks []*highlightTask + schema *schemaInfo + lbPolicy shardclient.LBPolicy + scheduler *taskScheduler collectionName string collectionID int64 @@ -286,7 +300,7 @@ func newLexicalHighlightOperator(t *searchTask, tasks []*highlightTask) (operato tasks: tasks, lbPolicy: t.lb, scheduler: t.node.(*Proxy).sched, - fieldSchemas: typeutil.GetAllFieldSchemas(t.schema.CollectionSchema), + schema: t.schema, collectionName: t.request.CollectionName, collectionID: t.CollectionID, dbName: t.request.DbName, @@ -314,30 +328,23 @@ func (op *lexicalHighlightOperator) run(ctx context.Context, span trace.Span, in texts := textFieldDatas.GetScalars().GetStringData().GetData() task.Texts = append(task.Texts, texts...) task.CorpusTextNum = int64(len(texts)) - field, ok := lo.Find(op.fieldSchemas, func(schema *schemapb.FieldSchema) bool { - return schema.GetFieldID() == task.GetFieldId() - }) - if !ok { - return nil, errors.Errorf("get highlight failed, field not found in schema %s: %d", task.GetFieldName(), task.GetFieldId()) + + field, err := op.schema.schemaHelper.GetFieldFromID(task.GetFieldId()) + if err != nil { + return nil, err + } + + nameFieldID, err := op.schema.GetMultiAnalyzerNameFieldID(field.GetFieldID()) + if err != nil { + return nil, err } // if use multi analyzer // get analyzer field data - helper := typeutil.CreateFieldSchemaHelper(field) - if v, ok := helper.GetMultiAnalyzerParams(); ok { - params := map[string]any{} - err := json.Unmarshal([]byte(v), ¶ms) - if err != nil { - return nil, errors.Errorf("get highlight failed, get invalid multi analyzer params-: %v", err) - } - analyzerField, ok := params["by_field"] + if nameFieldID > 0 { + analyzerFieldDatas, ok := lo.Find(datas, func(data *schemapb.FieldData) bool { return data.FieldId == nameFieldID }) if !ok { - return nil, errors.Errorf("get highlight failed, get invalid multi analyzer params, no by_field") - } - - analyzerFieldDatas, ok := lo.Find(datas, func(data *schemapb.FieldData) bool { return data.FieldName == analyzerField.(string) }) - if !ok { - return nil, errors.Errorf("get highlight failed, analyzer field not in output field") + return nil, errors.Errorf("get highlight failed, analyzer name field: %d for multi analyzer not in output field", nameFieldID) } task.AnalyzerNames = append(task.AnalyzerNames, analyzerFieldDatas.GetScalars().GetStringData().GetData()...) } diff --git a/internal/proxy/meta_cache.go b/internal/proxy/meta_cache.go index b382e4f6f2..664ca6cc60 100644 --- a/internal/proxy/meta_cache.go +++ b/internal/proxy/meta_cache.go @@ -18,6 +18,7 @@ package proxy import ( "context" + "encoding/json" "fmt" "strconv" "strings" @@ -119,10 +120,11 @@ type databaseInfo struct { // with extra fields mapping and methods type schemaInfo struct { *schemapb.CollectionSchema - fieldMap *typeutil.ConcurrentMap[string, int64] // field name to id mapping - hasPartitionKeyField bool - pkField *schemapb.FieldSchema - schemaHelper *typeutil.SchemaHelper + fieldMap *typeutil.ConcurrentMap[string, int64] // field name to id mapping + hasPartitionKeyField bool + pkField *schemapb.FieldSchema + multiAnalyzerFieldMap *typeutil.ConcurrentMap[int64, int64] // multi analzyer field id to dependent field id mapping + schemaHelper *typeutil.SchemaHelper } func newSchemaInfo(schema *schemapb.CollectionSchema) *schemaInfo { @@ -148,11 +150,12 @@ func newSchemaInfo(schema *schemapb.CollectionSchema) *schemaInfo { // partial load shall be processed as hint after tiered storage feature schemaHelper, _ := typeutil.CreateSchemaHelper(schema) return &schemaInfo{ - CollectionSchema: schema, - fieldMap: fieldMap, - hasPartitionKeyField: hasPartitionkey, - pkField: pkField, - schemaHelper: schemaHelper, + CollectionSchema: schema, + fieldMap: fieldMap, + hasPartitionKeyField: hasPartitionkey, + pkField: pkField, + multiAnalyzerFieldMap: typeutil.NewConcurrentMap[int64, int64](), + schemaHelper: schemaHelper, } } @@ -171,6 +174,48 @@ func (s *schemaInfo) GetPkField() (*schemapb.FieldSchema, error) { return s.pkField, nil } +func (s *schemaInfo) GetMultiAnalyzerNameFieldID(id int64) (int64, error) { + if id, ok := s.multiAnalyzerFieldMap.Get(id); ok { + return id, nil + } + + field, err := s.schemaHelper.GetFieldFromID(id) + if err != nil { + return 0, err + } + + helper := typeutil.CreateFieldSchemaHelper(field) + + params, ok := helper.GetMultiAnalyzerParams() + if !ok { + s.multiAnalyzerFieldMap.Insert(id, 0) + return 0, nil + } + + var raw map[string]json.RawMessage + err = json.Unmarshal([]byte(params), &raw) + if err != nil { + return 0, err + } + + jsonFieldID, ok := raw["by_field"] + if !ok { + return 0, merr.WrapErrServiceInternal("multi_analyzer_params missing required 'by_field' key") + } + var analyzerFieldName string + err = json.Unmarshal(jsonFieldID, &analyzerFieldName) + if err != nil { + return 0, err + } + analyzerField, err := s.schemaHelper.GetFieldFromName(analyzerFieldName) + if err != nil { + return 0, err + } + + s.multiAnalyzerFieldMap.Insert(id, analyzerField.GetFieldID()) + return analyzerField.GetFieldID(), nil +} + // GetLoadFieldIDs returns field id for load field list. // If input `loadFields` is empty, use collection schema definition. // Otherwise, perform load field list constraint check then return field id. diff --git a/internal/proxy/task_search.go b/internal/proxy/task_search.go index 8ad4ccea1d..70b82d76d1 100644 --- a/internal/proxy/task_search.go +++ b/internal/proxy/task_search.go @@ -602,7 +602,7 @@ func (t *searchTask) createLexicalHighlighter(highlighter *commonpb.Highlighter, if err != nil { return err } - err = h.addTaskWithSearchText(fieldId, fieldName, analyzerName, texts) + err = h.addTaskWithSearchText(t.schema, fieldId, fieldName, analyzerName, texts) if err != nil { return err } diff --git a/internal/proxy/task_search_test.go b/internal/proxy/task_search_test.go index b7ddbaf187..cf05a6d06f 100644 --- a/internal/proxy/task_search_test.go +++ b/internal/proxy/task_search_test.go @@ -4920,6 +4920,8 @@ func TestSearchTask_AddHighlightTask(t *testing.T) { }, } + schemaInfo := newSchemaInfo(schema) + placeholder := &commonpb.PlaceholderGroup{ Placeholders: []*commonpb.PlaceholderValue{{ Type: commonpb.PlaceholderType_VarChar, @@ -4932,9 +4934,7 @@ func TestSearchTask_AddHighlightTask(t *testing.T) { t.Run("lexical highlight success", func(t *testing.T) { task := &searchTask{ - schema: &schemaInfo{ - CollectionSchema: schema, - }, + schema: schemaInfo, } highlighter := &commonpb.Highlighter{ @@ -4954,9 +4954,7 @@ func TestSearchTask_AddHighlightTask(t *testing.T) { t.Run("Lexical highlight with custom tags", func(t *testing.T) { task := &searchTask{ - schema: &schemaInfo{ - CollectionSchema: schema, - }, + schema: schemaInfo, } highlighter := &commonpb.Highlighter{ @@ -4977,9 +4975,7 @@ func TestSearchTask_AddHighlightTask(t *testing.T) { t.Run("lexical highlight with wrong metric type", func(t *testing.T) { task := &searchTask{ - schema: &schemaInfo{ - CollectionSchema: schema, - }, + schema: schemaInfo, SearchRequest: &internalpb.SearchRequest{}, request: &milvuspb.SearchRequest{}, } @@ -4995,9 +4991,7 @@ func TestSearchTask_AddHighlightTask(t *testing.T) { t.Run("lexical highlight with invalid pre_tags type", func(t *testing.T) { task := &searchTask{ - schema: &schemaInfo{ - CollectionSchema: schema, - }, + schema: schemaInfo, } highlighter := &commonpb.Highlighter{ @@ -5021,10 +5015,9 @@ func TestSearchTask_AddHighlightTask(t *testing.T) { }, } + schemaInfo := newSchemaInfo(schemaWithoutBM25) task := &searchTask{ - schema: &schemaInfo{ - CollectionSchema: schemaWithoutBM25, - }, + schema: schemaInfo, } highlighter := &commonpb.Highlighter{ @@ -5038,9 +5031,7 @@ func TestSearchTask_AddHighlightTask(t *testing.T) { t.Run("highlight without highlight search text", func(t *testing.T) { task := &searchTask{ - schema: &schemaInfo{ - CollectionSchema: schema, - }, + schema: schemaInfo, } highlighter := &commonpb.Highlighter{ @@ -5054,9 +5045,7 @@ func TestSearchTask_AddHighlightTask(t *testing.T) { t.Run("highlight with invalid highlight search key", func(t *testing.T) { task := &searchTask{ - schema: &schemaInfo{ - CollectionSchema: schema, - }, + schema: schemaInfo, } highlighter := &commonpb.Highlighter{ @@ -5070,9 +5059,7 @@ func TestSearchTask_AddHighlightTask(t *testing.T) { t.Run("highlight with unknown type", func(t *testing.T) { task := &searchTask{ - schema: &schemaInfo{ - CollectionSchema: schema, - }, + schema: schemaInfo, } highlighter := &commonpb.Highlighter{ diff --git a/pkg/util/typeutil/field_schema.go b/pkg/util/typeutil/field_schema.go index b8ed5de4ff..eab4abf104 100644 --- a/pkg/util/typeutil/field_schema.go +++ b/pkg/util/typeutil/field_schema.go @@ -72,7 +72,7 @@ func (h *FieldSchemaHelper) EnableAnalyzer() bool { } func (h *FieldSchemaHelper) GetMultiAnalyzerParams() (string, bool) { - if !IsStringType(h.schema.GetDataType()) { + if !h.EnableAnalyzer() { return "", false } value, err := h.typeParams.Get("multi_analyzer_params")