mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
fix: highlight with multi analyzer failed (#46527)
relate: https://github.com/milvus-io/milvus/issues/46498 <!-- This is an auto-generated comment: release notes by coderabbit.ai --> - Core invariant: text fields configured with multi_analyzer_params must include a "by_field" string that names another field containing per-row analyzer choices; schemaInfo.GetMultiAnalyzerNameFieldID caches and returns the dependent field ID (or 0 if none) and relies on that mapping to make per-row analyzer names available to the highlighter. - What changed / simplified: the highlighter is now schema-aware — addTaskWithSearchText accepts *schemaInfo and uses GetMultiAnalyzerNameFieldID to resolve the analyzer-name field; resolution and caching moved into schemaInfo.multiAnalyzerFieldMap (meta_cache.go), eliminating ad-hoc/typeutil-only lookups and duplicated logic; GetMultiAnalyzerParams now gates on EnableAnalyzer(), centralizing analyzer enablement checks. - Why this fixes the bug (root cause): fixes #46498 — previously the highlighter failed when the analyzer-by-field was not in output_fields. The change (1) populates task.AnalyzerNames (defaulting missing names to "default") when multi-analyzer is configured and (2) appends the analyzer-name field ID to LexicalHighlighter.extraFields so FieldIDs includes it; the operator then requests the analyzer-name column at search time, ensuring per-row analyzer selection is available for highlighting. - No data-loss or regression: when no multi-analyzer is configured GetMultiAnalyzerNameFieldID returns 0 and behavior is unchanged; the patch only adds the analyzer-name field to requested output IDs (no mutation of stored data). Error handling on malformed params is preserved (errors are returned instead of silently changing data), and single-analyzer behavior remains untouched. <!-- end of auto-generated comment: release notes by coderabbit.ai --> Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
This commit is contained in:
parent
ebe82db4fe
commit
90809d1d86
@ -17,7 +17,6 @@ import (
|
||||
"github.com/milvus-io/milvus/pkg/v2/proto/querypb"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/funcutil"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -53,7 +52,8 @@ type highlightQuery struct {
|
||||
}
|
||||
|
||||
type LexicalHighlighter struct {
|
||||
tasks map[int64]*highlightTask // fieldID -> highlightTask
|
||||
tasks map[int64]*highlightTask // fieldID -> highlightTask
|
||||
extraFields []int64 // extra fields id for fetch analyzer name of multi analyzers
|
||||
// option for all highlight task
|
||||
// TODO: support set option for each task
|
||||
preTags [][]byte
|
||||
@ -65,7 +65,7 @@ type LexicalHighlighter struct {
|
||||
|
||||
// add highlight task with search
|
||||
// must used before addTaskWithQuery
|
||||
func (h *LexicalHighlighter) addTaskWithSearchText(fieldID int64, fieldName string, analyzerName string, texts []string) error {
|
||||
func (h *LexicalHighlighter) addTaskWithSearchText(collInfo *schemaInfo, fieldID int64, fieldName string, analyzerName string, texts []string) error {
|
||||
_, ok := h.tasks[fieldID]
|
||||
if ok {
|
||||
return merr.WrapErrParameterInvalidMsg("not support hybrid search with highlight now. fieldID: %d", fieldID)
|
||||
@ -84,11 +84,24 @@ func (h *LexicalHighlighter) addTaskWithSearchText(fieldID int64, fieldName stri
|
||||
|
||||
task.Texts = texts
|
||||
task.SearchTextNum = int64(len(texts))
|
||||
if analyzerName != "" {
|
||||
|
||||
// try get multi analyzer name field id
|
||||
nameFieldID, err := collInfo.GetMultiAnalyzerNameFieldID(fieldID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// set analyzer name and extra field id for multi analyzer
|
||||
if nameFieldID > 0 {
|
||||
// if multi analyzer name field id is found, set analyzer name to default
|
||||
if analyzerName == "" {
|
||||
analyzerName = "default"
|
||||
}
|
||||
|
||||
task.AnalyzerNames = []string{}
|
||||
for i := 0; i < len(texts); i++ {
|
||||
task.AnalyzerNames = append(task.AnalyzerNames, analyzerName)
|
||||
}
|
||||
h.extraFields = append(h.extraFields, nameFieldID)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -132,14 +145,15 @@ func (h *LexicalHighlighter) AsSearchPipelineOperator(t *searchTask) (operator,
|
||||
}
|
||||
|
||||
func (h *LexicalHighlighter) FieldIDs() []int64 {
|
||||
return lo.Keys(h.tasks)
|
||||
return append(lo.Keys(h.tasks), h.extraFields...)
|
||||
}
|
||||
|
||||
func NewLexicalHighlighter(highlighter *commonpb.Highlighter) (*LexicalHighlighter, error) {
|
||||
params := funcutil.KeyValuePair2Map(highlighter.GetParams())
|
||||
h := &LexicalHighlighter{
|
||||
tasks: make(map[int64]*highlightTask),
|
||||
options: &querypb.HighlightOptions{},
|
||||
tasks: make(map[int64]*highlightTask),
|
||||
options: &querypb.HighlightOptions{},
|
||||
extraFields: make([]int64, 0),
|
||||
}
|
||||
|
||||
// set pre_tags and post_tags
|
||||
@ -271,10 +285,10 @@ func NewLexicalHighlighter(highlighter *commonpb.Highlighter) (*LexicalHighlight
|
||||
}
|
||||
|
||||
type lexicalHighlightOperator struct {
|
||||
tasks []*highlightTask
|
||||
fieldSchemas []*schemapb.FieldSchema
|
||||
lbPolicy shardclient.LBPolicy
|
||||
scheduler *taskScheduler
|
||||
tasks []*highlightTask
|
||||
schema *schemaInfo
|
||||
lbPolicy shardclient.LBPolicy
|
||||
scheduler *taskScheduler
|
||||
|
||||
collectionName string
|
||||
collectionID int64
|
||||
@ -286,7 +300,7 @@ func newLexicalHighlightOperator(t *searchTask, tasks []*highlightTask) (operato
|
||||
tasks: tasks,
|
||||
lbPolicy: t.lb,
|
||||
scheduler: t.node.(*Proxy).sched,
|
||||
fieldSchemas: typeutil.GetAllFieldSchemas(t.schema.CollectionSchema),
|
||||
schema: t.schema,
|
||||
collectionName: t.request.CollectionName,
|
||||
collectionID: t.CollectionID,
|
||||
dbName: t.request.DbName,
|
||||
@ -314,30 +328,23 @@ func (op *lexicalHighlightOperator) run(ctx context.Context, span trace.Span, in
|
||||
texts := textFieldDatas.GetScalars().GetStringData().GetData()
|
||||
task.Texts = append(task.Texts, texts...)
|
||||
task.CorpusTextNum = int64(len(texts))
|
||||
field, ok := lo.Find(op.fieldSchemas, func(schema *schemapb.FieldSchema) bool {
|
||||
return schema.GetFieldID() == task.GetFieldId()
|
||||
})
|
||||
if !ok {
|
||||
return nil, errors.Errorf("get highlight failed, field not found in schema %s: %d", task.GetFieldName(), task.GetFieldId())
|
||||
|
||||
field, err := op.schema.schemaHelper.GetFieldFromID(task.GetFieldId())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
nameFieldID, err := op.schema.GetMultiAnalyzerNameFieldID(field.GetFieldID())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// if use multi analyzer
|
||||
// get analyzer field data
|
||||
helper := typeutil.CreateFieldSchemaHelper(field)
|
||||
if v, ok := helper.GetMultiAnalyzerParams(); ok {
|
||||
params := map[string]any{}
|
||||
err := json.Unmarshal([]byte(v), ¶ms)
|
||||
if err != nil {
|
||||
return nil, errors.Errorf("get highlight failed, get invalid multi analyzer params-: %v", err)
|
||||
}
|
||||
analyzerField, ok := params["by_field"]
|
||||
if nameFieldID > 0 {
|
||||
analyzerFieldDatas, ok := lo.Find(datas, func(data *schemapb.FieldData) bool { return data.FieldId == nameFieldID })
|
||||
if !ok {
|
||||
return nil, errors.Errorf("get highlight failed, get invalid multi analyzer params, no by_field")
|
||||
}
|
||||
|
||||
analyzerFieldDatas, ok := lo.Find(datas, func(data *schemapb.FieldData) bool { return data.FieldName == analyzerField.(string) })
|
||||
if !ok {
|
||||
return nil, errors.Errorf("get highlight failed, analyzer field not in output field")
|
||||
return nil, errors.Errorf("get highlight failed, analyzer name field: %d for multi analyzer not in output field", nameFieldID)
|
||||
}
|
||||
task.AnalyzerNames = append(task.AnalyzerNames, analyzerFieldDatas.GetScalars().GetStringData().GetData()...)
|
||||
}
|
||||
|
||||
@ -18,6 +18,7 @@ package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
@ -119,10 +120,11 @@ type databaseInfo struct {
|
||||
// with extra fields mapping and methods
|
||||
type schemaInfo struct {
|
||||
*schemapb.CollectionSchema
|
||||
fieldMap *typeutil.ConcurrentMap[string, int64] // field name to id mapping
|
||||
hasPartitionKeyField bool
|
||||
pkField *schemapb.FieldSchema
|
||||
schemaHelper *typeutil.SchemaHelper
|
||||
fieldMap *typeutil.ConcurrentMap[string, int64] // field name to id mapping
|
||||
hasPartitionKeyField bool
|
||||
pkField *schemapb.FieldSchema
|
||||
multiAnalyzerFieldMap *typeutil.ConcurrentMap[int64, int64] // multi analzyer field id to dependent field id mapping
|
||||
schemaHelper *typeutil.SchemaHelper
|
||||
}
|
||||
|
||||
func newSchemaInfo(schema *schemapb.CollectionSchema) *schemaInfo {
|
||||
@ -148,11 +150,12 @@ func newSchemaInfo(schema *schemapb.CollectionSchema) *schemaInfo {
|
||||
// partial load shall be processed as hint after tiered storage feature
|
||||
schemaHelper, _ := typeutil.CreateSchemaHelper(schema)
|
||||
return &schemaInfo{
|
||||
CollectionSchema: schema,
|
||||
fieldMap: fieldMap,
|
||||
hasPartitionKeyField: hasPartitionkey,
|
||||
pkField: pkField,
|
||||
schemaHelper: schemaHelper,
|
||||
CollectionSchema: schema,
|
||||
fieldMap: fieldMap,
|
||||
hasPartitionKeyField: hasPartitionkey,
|
||||
pkField: pkField,
|
||||
multiAnalyzerFieldMap: typeutil.NewConcurrentMap[int64, int64](),
|
||||
schemaHelper: schemaHelper,
|
||||
}
|
||||
}
|
||||
|
||||
@ -171,6 +174,48 @@ func (s *schemaInfo) GetPkField() (*schemapb.FieldSchema, error) {
|
||||
return s.pkField, nil
|
||||
}
|
||||
|
||||
func (s *schemaInfo) GetMultiAnalyzerNameFieldID(id int64) (int64, error) {
|
||||
if id, ok := s.multiAnalyzerFieldMap.Get(id); ok {
|
||||
return id, nil
|
||||
}
|
||||
|
||||
field, err := s.schemaHelper.GetFieldFromID(id)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
helper := typeutil.CreateFieldSchemaHelper(field)
|
||||
|
||||
params, ok := helper.GetMultiAnalyzerParams()
|
||||
if !ok {
|
||||
s.multiAnalyzerFieldMap.Insert(id, 0)
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
var raw map[string]json.RawMessage
|
||||
err = json.Unmarshal([]byte(params), &raw)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
jsonFieldID, ok := raw["by_field"]
|
||||
if !ok {
|
||||
return 0, merr.WrapErrServiceInternal("multi_analyzer_params missing required 'by_field' key")
|
||||
}
|
||||
var analyzerFieldName string
|
||||
err = json.Unmarshal(jsonFieldID, &analyzerFieldName)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
analyzerField, err := s.schemaHelper.GetFieldFromName(analyzerFieldName)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
s.multiAnalyzerFieldMap.Insert(id, analyzerField.GetFieldID())
|
||||
return analyzerField.GetFieldID(), nil
|
||||
}
|
||||
|
||||
// GetLoadFieldIDs returns field id for load field list.
|
||||
// If input `loadFields` is empty, use collection schema definition.
|
||||
// Otherwise, perform load field list constraint check then return field id.
|
||||
|
||||
@ -602,7 +602,7 @@ func (t *searchTask) createLexicalHighlighter(highlighter *commonpb.Highlighter,
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = h.addTaskWithSearchText(fieldId, fieldName, analyzerName, texts)
|
||||
err = h.addTaskWithSearchText(t.schema, fieldId, fieldName, analyzerName, texts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -4920,6 +4920,8 @@ func TestSearchTask_AddHighlightTask(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
schemaInfo := newSchemaInfo(schema)
|
||||
|
||||
placeholder := &commonpb.PlaceholderGroup{
|
||||
Placeholders: []*commonpb.PlaceholderValue{{
|
||||
Type: commonpb.PlaceholderType_VarChar,
|
||||
@ -4932,9 +4934,7 @@ func TestSearchTask_AddHighlightTask(t *testing.T) {
|
||||
|
||||
t.Run("lexical highlight success", func(t *testing.T) {
|
||||
task := &searchTask{
|
||||
schema: &schemaInfo{
|
||||
CollectionSchema: schema,
|
||||
},
|
||||
schema: schemaInfo,
|
||||
}
|
||||
|
||||
highlighter := &commonpb.Highlighter{
|
||||
@ -4954,9 +4954,7 @@ func TestSearchTask_AddHighlightTask(t *testing.T) {
|
||||
|
||||
t.Run("Lexical highlight with custom tags", func(t *testing.T) {
|
||||
task := &searchTask{
|
||||
schema: &schemaInfo{
|
||||
CollectionSchema: schema,
|
||||
},
|
||||
schema: schemaInfo,
|
||||
}
|
||||
|
||||
highlighter := &commonpb.Highlighter{
|
||||
@ -4977,9 +4975,7 @@ func TestSearchTask_AddHighlightTask(t *testing.T) {
|
||||
|
||||
t.Run("lexical highlight with wrong metric type", func(t *testing.T) {
|
||||
task := &searchTask{
|
||||
schema: &schemaInfo{
|
||||
CollectionSchema: schema,
|
||||
},
|
||||
schema: schemaInfo,
|
||||
SearchRequest: &internalpb.SearchRequest{},
|
||||
request: &milvuspb.SearchRequest{},
|
||||
}
|
||||
@ -4995,9 +4991,7 @@ func TestSearchTask_AddHighlightTask(t *testing.T) {
|
||||
|
||||
t.Run("lexical highlight with invalid pre_tags type", func(t *testing.T) {
|
||||
task := &searchTask{
|
||||
schema: &schemaInfo{
|
||||
CollectionSchema: schema,
|
||||
},
|
||||
schema: schemaInfo,
|
||||
}
|
||||
|
||||
highlighter := &commonpb.Highlighter{
|
||||
@ -5021,10 +5015,9 @@ func TestSearchTask_AddHighlightTask(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
schemaInfo := newSchemaInfo(schemaWithoutBM25)
|
||||
task := &searchTask{
|
||||
schema: &schemaInfo{
|
||||
CollectionSchema: schemaWithoutBM25,
|
||||
},
|
||||
schema: schemaInfo,
|
||||
}
|
||||
|
||||
highlighter := &commonpb.Highlighter{
|
||||
@ -5038,9 +5031,7 @@ func TestSearchTask_AddHighlightTask(t *testing.T) {
|
||||
|
||||
t.Run("highlight without highlight search text", func(t *testing.T) {
|
||||
task := &searchTask{
|
||||
schema: &schemaInfo{
|
||||
CollectionSchema: schema,
|
||||
},
|
||||
schema: schemaInfo,
|
||||
}
|
||||
|
||||
highlighter := &commonpb.Highlighter{
|
||||
@ -5054,9 +5045,7 @@ func TestSearchTask_AddHighlightTask(t *testing.T) {
|
||||
|
||||
t.Run("highlight with invalid highlight search key", func(t *testing.T) {
|
||||
task := &searchTask{
|
||||
schema: &schemaInfo{
|
||||
CollectionSchema: schema,
|
||||
},
|
||||
schema: schemaInfo,
|
||||
}
|
||||
|
||||
highlighter := &commonpb.Highlighter{
|
||||
@ -5070,9 +5059,7 @@ func TestSearchTask_AddHighlightTask(t *testing.T) {
|
||||
|
||||
t.Run("highlight with unknown type", func(t *testing.T) {
|
||||
task := &searchTask{
|
||||
schema: &schemaInfo{
|
||||
CollectionSchema: schema,
|
||||
},
|
||||
schema: schemaInfo,
|
||||
}
|
||||
|
||||
highlighter := &commonpb.Highlighter{
|
||||
|
||||
@ -72,7 +72,7 @@ func (h *FieldSchemaHelper) EnableAnalyzer() bool {
|
||||
}
|
||||
|
||||
func (h *FieldSchemaHelper) GetMultiAnalyzerParams() (string, bool) {
|
||||
if !IsStringType(h.schema.GetDataType()) {
|
||||
if !h.EnableAnalyzer() {
|
||||
return "", false
|
||||
}
|
||||
value, err := h.typeParams.Get("multi_analyzer_params")
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user