feat: support search bm25 with highlight (#44923)

relate: https://github.com/milvus-io/milvus/issues/42589

---------

Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
This commit is contained in:
aoiasd 2025-11-18 16:09:39 +08:00 committed by GitHub
parent 16acf8829b
commit 947c8855f3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
25 changed files with 2878 additions and 869 deletions

View File

@ -405,3 +405,14 @@ func (c *Client) ValidateAnalyzer(ctx context.Context, req *querypb.ValidateAnal
return client.ValidateAnalyzer(ctx, req)
})
}
func (c *Client) GetHighlight(ctx context.Context, req *querypb.GetHighlightRequest, _ ...grpc.CallOption) (*querypb.GetHighlightResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(c.nodeID),
)
return wrapGrpcCall(ctx, c, func(client querypb.QueryNodeClient) (*querypb.GetHighlightResponse, error) {
return client.GetHighlight(ctx, req)
})
}

View File

@ -117,6 +117,9 @@ func Test_NewClient(t *testing.T) {
r23, err := client.ValidateAnalyzer(ctx, nil)
retCheck(retNotNil, r23, err)
r24, err := client.GetHighlight(ctx, nil)
retCheck(retNotNil, r24, err)
// stream rpc
client, err := client.QueryStream(ctx, nil)
retCheck(retNotNil, client, err)

View File

@ -411,3 +411,7 @@ func (s *Server) DropIndex(ctx context.Context, req *querypb.DropIndexRequest) (
func (s *Server) ValidateAnalyzer(ctx context.Context, req *querypb.ValidateAnalyzerRequest) (*commonpb.Status, error) {
return s.querynode.ValidateAnalyzer(ctx, req)
}
func (s *Server) GetHighlight(ctx context.Context, req *querypb.GetHighlightRequest) (*querypb.GetHighlightResponse, error) {
return s.querynode.GetHighlight(ctx, req)
}

View File

@ -290,6 +290,18 @@ func Test_NewServer(t *testing.T) {
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
t.Run("GetHighlight", func(t *testing.T) {
mockQN.EXPECT().GetHighlight(mock.Anything, mock.Anything).Return(&querypb.GetHighlightResponse{
Status: merr.Success(),
}, nil)
resp, err := server.GetHighlight(ctx, &querypb.GetHighlightRequest{
Channel: "test-channel",
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
t.Run("ValidateAnalyzer", func(t *testing.T) {
mockQN.EXPECT().ValidateAnalyzer(mock.Anything, mock.Anything).Return(&commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil)
req := &querypb.ValidateAnalyzerRequest{}

View File

@ -370,6 +370,65 @@ func (_c *MockQueryNode_GetDataDistribution_Call) RunAndReturn(run func(context.
return _c
}
// GetHighlight provides a mock function with given fields: _a0, _a1
func (_m *MockQueryNode) GetHighlight(_a0 context.Context, _a1 *querypb.GetHighlightRequest) (*querypb.GetHighlightResponse, error) {
ret := _m.Called(_a0, _a1)
if len(ret) == 0 {
panic("no return value specified for GetHighlight")
}
var r0 *querypb.GetHighlightResponse
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *querypb.GetHighlightRequest) (*querypb.GetHighlightResponse, error)); ok {
return rf(_a0, _a1)
}
if rf, ok := ret.Get(0).(func(context.Context, *querypb.GetHighlightRequest) *querypb.GetHighlightResponse); ok {
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*querypb.GetHighlightResponse)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *querypb.GetHighlightRequest) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockQueryNode_GetHighlight_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetHighlight'
type MockQueryNode_GetHighlight_Call struct {
*mock.Call
}
// GetHighlight is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 *querypb.GetHighlightRequest
func (_e *MockQueryNode_Expecter) GetHighlight(_a0 interface{}, _a1 interface{}) *MockQueryNode_GetHighlight_Call {
return &MockQueryNode_GetHighlight_Call{Call: _e.mock.On("GetHighlight", _a0, _a1)}
}
func (_c *MockQueryNode_GetHighlight_Call) Run(run func(_a0 context.Context, _a1 *querypb.GetHighlightRequest)) *MockQueryNode_GetHighlight_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*querypb.GetHighlightRequest))
})
return _c
}
func (_c *MockQueryNode_GetHighlight_Call) Return(_a0 *querypb.GetHighlightResponse, _a1 error) *MockQueryNode_GetHighlight_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockQueryNode_GetHighlight_Call) RunAndReturn(run func(context.Context, *querypb.GetHighlightRequest) (*querypb.GetHighlightResponse, error)) *MockQueryNode_GetHighlight_Call {
_c.Call.Return(run)
return _c
}
// GetMetrics provides a mock function with given fields: _a0, _a1
func (_m *MockQueryNode) GetMetrics(_a0 context.Context, _a1 *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
ret := _m.Called(_a0, _a1)

View File

@ -446,6 +446,80 @@ func (_c *MockQueryNodeClient_GetDataDistribution_Call) RunAndReturn(run func(co
return _c
}
// GetHighlight provides a mock function with given fields: ctx, in, opts
func (_m *MockQueryNodeClient) GetHighlight(ctx context.Context, in *querypb.GetHighlightRequest, opts ...grpc.CallOption) (*querypb.GetHighlightResponse, error) {
_va := make([]interface{}, len(opts))
for _i := range opts {
_va[_i] = opts[_i]
}
var _ca []interface{}
_ca = append(_ca, ctx, in)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
if len(ret) == 0 {
panic("no return value specified for GetHighlight")
}
var r0 *querypb.GetHighlightResponse
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *querypb.GetHighlightRequest, ...grpc.CallOption) (*querypb.GetHighlightResponse, error)); ok {
return rf(ctx, in, opts...)
}
if rf, ok := ret.Get(0).(func(context.Context, *querypb.GetHighlightRequest, ...grpc.CallOption) *querypb.GetHighlightResponse); ok {
r0 = rf(ctx, in, opts...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*querypb.GetHighlightResponse)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *querypb.GetHighlightRequest, ...grpc.CallOption) error); ok {
r1 = rf(ctx, in, opts...)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockQueryNodeClient_GetHighlight_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetHighlight'
type MockQueryNodeClient_GetHighlight_Call struct {
*mock.Call
}
// GetHighlight is a helper method to define mock.On call
// - ctx context.Context
// - in *querypb.GetHighlightRequest
// - opts ...grpc.CallOption
func (_e *MockQueryNodeClient_Expecter) GetHighlight(ctx interface{}, in interface{}, opts ...interface{}) *MockQueryNodeClient_GetHighlight_Call {
return &MockQueryNodeClient_GetHighlight_Call{Call: _e.mock.On("GetHighlight",
append([]interface{}{ctx, in}, opts...)...)}
}
func (_c *MockQueryNodeClient_GetHighlight_Call) Run(run func(ctx context.Context, in *querypb.GetHighlightRequest, opts ...grpc.CallOption)) *MockQueryNodeClient_GetHighlight_Call {
_c.Call.Run(func(args mock.Arguments) {
variadicArgs := make([]grpc.CallOption, len(args)-2)
for i, a := range args[2:] {
if a != nil {
variadicArgs[i] = a.(grpc.CallOption)
}
}
run(args[0].(context.Context), args[1].(*querypb.GetHighlightRequest), variadicArgs...)
})
return _c
}
func (_c *MockQueryNodeClient_GetHighlight_Call) Return(_a0 *querypb.GetHighlightResponse, _a1 error) *MockQueryNodeClient_GetHighlight_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockQueryNodeClient_GetHighlight_Call) RunAndReturn(run func(context.Context, *querypb.GetHighlightRequest, ...grpc.CallOption) (*querypb.GetHighlightResponse, error)) *MockQueryNodeClient_GetHighlight_Call {
_c.Call.Return(run)
return _c
}
// GetMetrics provides a mock function with given fields: ctx, in, opts
func (_m *MockQueryNodeClient) GetMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest, opts ...grpc.CallOption) (*milvuspb.GetMetricsResponse, error) {
_va := make([]interface{}, len(opts))

View File

@ -21,6 +21,7 @@ import (
"context"
"fmt"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
@ -29,13 +30,16 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/json"
"github.com/milvus-io/milvus/internal/parser/planparserv2"
"github.com/milvus-io/milvus/internal/proxy/shardclient"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/function/rerank"
"github.com/milvus-io/milvus/internal/util/segcore"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/internalpb"
"github.com/milvus-io/milvus/pkg/v2/proto/planpb"
"github.com/milvus-io/milvus/pkg/v2/proto/querypb"
"github.com/milvus-io/milvus/pkg/v2/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
@ -112,6 +116,7 @@ const (
organizeOp = "organize"
filterFieldOp = "filter_field"
lambdaOp = "lambda"
highlightOp = "highlight"
)
var opFactory = map[string]func(t *searchTask, params map[string]any) (operator, error){
@ -122,6 +127,7 @@ var opFactory = map[string]func(t *searchTask, params map[string]any) (operator,
requeryOp: newRequeryOperator,
lambdaOp: newLambdaOperator,
filterFieldOp: newFilterFieldOperator,
highlightOp: newHighlightOperator,
}
func NewNode(info *nodeDef, t *searchTask) (*Node, error) {
@ -592,6 +598,154 @@ func (op *filterFieldOperator) run(ctx context.Context, span trace.Span, inputs
return []any{result}, nil
}
const (
PreTagsKey = "pre_tags"
PostTagsKey = "post_tags"
HighlightSearchTextKey = "highlight_search_data"
DefaultPreTag = "<em>"
DefaultPostTag = "</em>"
)
type highlightTask struct {
*querypb.HighlightTask
preTags [][]byte
postTags [][]byte
}
type highlightOperator struct {
tasks []*highlightTask
fieldSchemas []*schemapb.FieldSchema
lbPolicy shardclient.LBPolicy
scheduler *taskScheduler
collectionName string
collectionID int64
dbName string
preTag []byte
postTag []byte
}
func newHighlightOperator(t *searchTask, _ map[string]any) (operator, error) {
return &highlightOperator{
tasks: t.highlightTasks,
lbPolicy: t.lb,
scheduler: t.node.(*Proxy).sched,
fieldSchemas: typeutil.GetAllFieldSchemas(t.schema.CollectionSchema),
collectionName: t.request.CollectionName,
collectionID: t.CollectionID,
dbName: t.request.DbName,
}, nil
}
func (op *highlightOperator) run(ctx context.Context, span trace.Span, inputs ...any) ([]any, error) {
result := inputs[0].(*milvuspb.SearchResults)
datas := result.Results.GetFieldsData()
req := &querypb.GetHighlightRequest{
Topks: result.GetResults().GetTopks(),
Tasks: lo.Map(op.tasks, func(task *highlightTask, _ int) *querypb.HighlightTask { return task.HighlightTask }),
}
for _, task := range req.GetTasks() {
textFieldDatas, ok := lo.Find(datas, func(data *schemapb.FieldData) bool { return data.FieldId == task.GetFieldId() })
if !ok {
return nil, errors.Errorf("get highlight failed, text field not in output field %s: %d", task.GetFieldName(), task.GetFieldId())
}
texts := textFieldDatas.GetScalars().GetStringData().GetData()
task.Texts = append(task.Texts, texts...)
task.CorpusTextNum = int64(len(texts))
field, ok := lo.Find(op.fieldSchemas, func(schema *schemapb.FieldSchema) bool {
return schema.GetFieldID() == task.GetFieldId()
})
if !ok {
return nil, errors.Errorf("get highlight failed, field not found in schema %s: %d", task.GetFieldName(), task.GetFieldId())
}
// if use multi analyzer
// get analyzer field data
helper := typeutil.CreateFieldSchemaHelper(field)
if v, ok := helper.GetMultiAnalyzerParams(); ok {
params := map[string]any{}
err := json.Unmarshal([]byte(v), &params)
if err != nil {
return nil, errors.Errorf("get highlight failed, get invalid multi analyzer params-: %v", err)
}
analyzerField, ok := params["by_field"]
if !ok {
return nil, errors.Errorf("get highlight failed, get invalid multi analyzer params, no by_field")
}
analyzerFieldDatas, ok := lo.Find(datas, func(data *schemapb.FieldData) bool { return data.FieldName == analyzerField.(string) })
if !ok {
return nil, errors.Errorf("get highlight failed, analyzer field not in output field")
}
task.AnalyzerNames = append(task.AnalyzerNames, analyzerFieldDatas.GetScalars().GetStringData().GetData()...)
}
}
task := &HighlightTask{
ctx: ctx,
lb: op.lbPolicy,
Condition: NewTaskCondition(ctx),
GetHighlightRequest: req,
collectionName: op.collectionName,
collectionID: op.collectionID,
dbName: op.dbName,
}
if err := op.scheduler.dqQueue.Enqueue(task); err != nil {
return nil, err
}
if err := task.WaitToFinish(); err != nil {
return nil, err
}
rowNum := len(result.Results.GetScores())
HighlightResults := []*commonpb.HighlightResult{}
if rowNum != 0 {
rowDatas := lo.Map(task.result.Results, func(result *querypb.HighlightResult, i int) *commonpb.HighlightData {
return buildStringFragments(op.tasks[i/rowNum], i%rowNum, result.GetFragments())
})
for i, task := range req.GetTasks() {
HighlightResults = append(HighlightResults, &commonpb.HighlightResult{
FieldName: task.GetFieldName(),
Datas: rowDatas[i*rowNum : (i+1)*rowNum],
})
}
}
result.Results.HighlightResults = HighlightResults
return []any{result}, nil
}
func buildStringFragments(task *highlightTask, idx int, frags []*querypb.HighlightFragment) *commonpb.HighlightData {
bytes := []byte(task.Texts[int(task.GetSearchTextNum())+idx])
preTagsNum := len(task.preTags)
postTagsNum := len(task.postTags)
result := &commonpb.HighlightData{Fragments: make([]string, 0)}
for _, frag := range frags {
fragBytes := []byte{}
cursor := int(frag.GetStartOffset())
for i := 0; i < len(frag.GetOffsets())/2; i++ {
startOffset := int(frag.Offsets[i<<1])
endOffset := int(frag.Offsets[(i<<1)+1])
if cursor < startOffset {
fragBytes = append(fragBytes, bytes[cursor:startOffset]...)
}
fragBytes = append(fragBytes, task.preTags[i%preTagsNum]...)
fragBytes = append(fragBytes, bytes[startOffset:endOffset]...)
fragBytes = append(fragBytes, task.postTags[i%postTagsNum]...)
cursor = endOffset
}
if cursor < int(frag.GetEndOffset()) {
fragBytes = append(fragBytes, bytes[cursor:frag.GetEndOffset()]...)
}
result.Fragments = append(result.Fragments, string(fragBytes))
}
return result
}
func mergeIDsFunc(ctx context.Context, span trace.Span, inputs ...any) ([]any, error) {
multipleMilvusResults := inputs[0].([]*milvuspb.SearchResults)
idInt64Type := false
@ -648,6 +802,17 @@ func newPipeline(pipeDef *pipelineDef, t *searchTask) (*pipeline, error) {
return &pipeline{name: pipeDef.name, nodes: nodes}, nil
}
func (p *pipeline) AddNodes(t *searchTask, nodes ...*nodeDef) error {
for _, def := range nodes {
node, err := NewNode(def, t)
if err != nil {
return err
}
p.nodes = append(p.nodes, node)
}
return nil
}
func (p *pipeline) Run(ctx context.Context, span trace.Span, toReduceResults []*internalpb.SearchResults, storageCost segcore.StorageCost) (*milvuspb.SearchResults, segcore.StorageCost, error) {
log.Ctx(ctx).Debug("SearchPipeline run", zap.String("pipeline", p.String()))
msg := opMsg{}
@ -678,6 +843,20 @@ type pipelineDef struct {
nodes []*nodeDef
}
var filterFieldNode = &nodeDef{
name: "filter_field",
inputs: []string{"result"},
outputs: []string{"output"},
opName: filterFieldOp,
}
var highlightNode = &nodeDef{
name: "highlight",
inputs: []string{"result"},
outputs: []string{"output"},
opName: highlightOp,
}
var searchPipe = &pipelineDef{
name: "search",
nodes: []*nodeDef{
@ -699,12 +878,6 @@ var searchPipe = &pipelineDef{
},
opName: lambdaOp,
},
{
name: "filter_field",
inputs: []string{"result"},
outputs: []string{"output"},
opName: filterFieldOp,
},
},
}
@ -763,12 +936,6 @@ var searchWithRequeryPipe = &pipelineDef{
},
opName: lambdaOp,
},
{
name: "filter_field",
inputs: []string{"result"},
outputs: []string{"output"},
opName: filterFieldOp,
},
},
}
@ -821,12 +988,6 @@ var searchWithRerankPipe = &pipelineDef{
},
opName: lambdaOp,
},
{
name: "filter_field",
inputs: []string{"result"},
outputs: []string{"output"},
opName: filterFieldOp,
},
},
}
@ -895,12 +1056,6 @@ var searchWithRerankRequeryPipe = &pipelineDef{
},
opName: lambdaOp,
},
{
name: "filter_field",
inputs: []string{"result"},
outputs: []string{"output"},
opName: filterFieldOp,
},
},
}
@ -919,12 +1074,6 @@ var hybridSearchPipe = &pipelineDef{
outputs: []string{"result"},
opName: rerankOp,
},
{
name: "filter_field",
inputs: []string{"result"},
outputs: []string{"output"},
opName: filterFieldOp,
},
},
}
@ -1026,12 +1175,6 @@ var hybridSearchWithRequeryAndRerankByFieldDataPipe = &pipelineDef{
},
opName: lambdaOp,
},
{
name: "filter_field",
inputs: []string{"result"},
outputs: []string{"output"},
opName: filterFieldOp,
},
},
}
@ -1128,3 +1271,23 @@ func newBuiltInPipeline(t *searchTask) (*pipeline, error) {
}
return nil, fmt.Errorf("Unsupported pipeline")
}
func newSearchPipeline(t *searchTask) (*pipeline, error) {
p, err := newBuiltInPipeline(t)
if err != nil {
return nil, err
}
if len(t.highlightTasks) > 0 {
err := p.AddNodes(t, highlightNode, filterFieldNode)
if err != nil {
return nil, err
}
} else {
err := p.AddNodes(t, filterFieldNode)
if err != nil {
return nil, err
}
}
return p, nil
}

View File

@ -24,6 +24,7 @@ import (
"time"
"github.com/bytedance/mockey"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
@ -31,11 +32,15 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/proxy/shardclient"
"github.com/milvus-io/milvus/internal/util/function/models"
"github.com/milvus-io/milvus/internal/util/function/rerank"
"github.com/milvus-io/milvus/internal/util/segcore"
"github.com/milvus-io/milvus/pkg/v2/proto/internalpb"
"github.com/milvus-io/milvus/pkg/v2/proto/planpb"
"github.com/milvus-io/milvus/pkg/v2/proto/querypb"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/testutils"
"github.com/milvus-io/milvus/pkg/v2/util/timerecord"
)
@ -267,6 +272,94 @@ func (s *SearchPipelineSuite) TestOrganizeOp() {
fmt.Println(ret)
}
func (s *SearchPipelineSuite) TestHighlightOp() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
proxy := &Proxy{}
proxy.tsoAllocator = &timestampAllocator{
tso: newMockTimestampAllocatorInterface(),
}
sched, err := newTaskScheduler(ctx, proxy.tsoAllocator)
s.Require().NoError(err)
err = sched.Start()
s.Require().NoError(err)
defer sched.Close()
proxy.sched = sched
collName := "test_coll_highlight"
fieldName2Types := map[string]schemapb.DataType{
testVarCharField: schemapb.DataType_VarChar,
}
schema := constructCollectionSchemaByDataType(collName, fieldName2Types, testVarCharField, false)
req := &milvuspb.SearchRequest{
CollectionName: collName,
DbName: "default",
}
highlightTasks := []*highlightTask{
{
HighlightTask: &querypb.HighlightTask{
Texts: []string{"target text"},
FieldName: testVarCharField,
FieldId: 100,
},
preTags: [][]byte{[]byte(DefaultPreTag)},
postTags: [][]byte{[]byte(DefaultPostTag)},
},
}
mockLb := shardclient.NewMockLBPolicy(s.T())
searchTask := &searchTask{
node: proxy,
highlightTasks: highlightTasks,
lb: mockLb,
schema: newSchemaInfo(schema),
request: req,
collectionName: collName,
SearchRequest: &internalpb.SearchRequest{
CollectionID: 0,
},
}
op, err := opFactory[highlightOp](searchTask, map[string]any{})
s.Require().NoError(err)
// mockery
mockLb.EXPECT().ExecuteOneChannel(mock.Anything, mock.Anything).Run(func(ctx context.Context, workload shardclient.CollectionWorkLoad) {
qn := mocks.NewMockQueryNodeClient(s.T())
qn.EXPECT().GetHighlight(mock.Anything, mock.Anything).Return(
&querypb.GetHighlightResponse{
Status: merr.Success(),
Results: []*querypb.HighlightResult{},
}, nil)
workload.Exec(ctx, 0, qn, "test_chan")
}).Return(nil)
_, err = op.run(ctx, s.span, &milvuspb.SearchResults{
Results: &schemapb.SearchResultData{
TopK: 3,
Topks: []int64{1},
FieldsData: []*schemapb.FieldData{{
FieldName: testVarCharField,
FieldId: 100,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_StringData{
StringData: &schemapb.StringArray{
Data: []string{"match text"},
},
},
},
},
}},
},
})
s.NoError(err)
}
func (s *SearchPipelineSuite) TestSearchPipeline() {
collectionName := "test"
task := &searchTask{
@ -296,8 +389,11 @@ func (s *SearchPipelineSuite) TestSearchPipeline() {
queryInfos: []*planpb.QueryInfo{{}},
translatedOutputFields: []string{"intField"},
}
pipeline, err := newPipeline(searchPipe, task)
s.NoError(err)
pipeline.AddNodes(task, filterFieldNode)
sr := genTestSearchResultData(2, 10, schemapb.DataType_Int64, "intField", 101, false)
results, storageCost, err := pipeline.Run(context.Background(), s.span, []*internalpb.SearchResults{sr}, segcore.StorageCost{ScannedRemoteBytes: 100, ScannedTotalBytes: 250})
s.NoError(err)
@ -364,6 +460,8 @@ func (s *SearchPipelineSuite) TestSearchPipelineWithRequery() {
pipeline, err := newPipeline(searchWithRequeryPipe, task)
s.NoError(err)
pipeline.AddNodes(task, filterFieldNode)
results, storageCost, err := pipeline.Run(context.Background(), s.span, []*internalpb.SearchResults{
genTestSearchResultData(2, 10, schemapb.DataType_Int64, "intField", 101, false),
}, segcore.StorageCost{ScannedRemoteBytes: 100, ScannedTotalBytes: 200})
@ -435,6 +533,7 @@ func (s *SearchPipelineSuite) TestSearchWithRerankPipe() {
pipeline, err := newPipeline(searchWithRerankPipe, task)
s.NoError(err)
pipeline.AddNodes(task, filterFieldNode)
searchResults := genTestSearchResultData(2, 10, schemapb.DataType_Int64, "intField", 101, false)
results, _, err := pipeline.Run(context.Background(), s.span, []*internalpb.SearchResults{searchResults}, segcore.StorageCost{})
@ -518,6 +617,7 @@ func (s *SearchPipelineSuite) TestSearchWithRerankRequeryPipe() {
pipeline, err := newPipeline(searchWithRerankRequeryPipe, task)
s.NoError(err)
pipeline.AddNodes(task, filterFieldNode)
searchResults := genTestSearchResultData(2, 10, schemapb.DataType_Int64, "intField", 101, false)
results, storageCost, err := pipeline.Run(context.Background(), s.span, []*internalpb.SearchResults{searchResults}, segcore.StorageCost{})
@ -551,6 +651,7 @@ func (s *SearchPipelineSuite) TestHybridSearchPipe() {
pipeline, err := newPipeline(hybridSearchPipe, task)
s.NoError(err)
pipeline.AddNodes(task, filterFieldNode)
f1 := genTestSearchResultData(2, 10, schemapb.DataType_Int64, "intField", 101, true)
f2 := genTestSearchResultData(2, 10, schemapb.DataType_Int64, "intField", 101, true)
@ -663,6 +764,7 @@ func (s *SearchPipelineSuite) TestHybridSearchWithRequeryAndRerankByDataPipe() {
pipeline, err := newPipeline(hybridSearchWithRequeryAndRerankByFieldDataPipe, task)
s.NoError(err)
pipeline.AddNodes(task, filterFieldNode)
d1 := genTestSearchResultData(2, 10, schemapb.DataType_Int64, "intField", 101, true)
d2 := genTestSearchResultData(2, 10, schemapb.DataType_Int64, "intField", 101, true)
@ -704,6 +806,7 @@ func (s *SearchPipelineSuite) TestHybridSearchWithRequeryPipe() {
pipeline, err := newPipeline(hybridSearchWithRequeryPipe, task)
s.NoError(err)
pipeline.AddNodes(task, filterFieldNode)
d1 := genTestSearchResultData(2, 10, schemapb.DataType_Int64, "intField", 101, true)
d2 := genTestSearchResultData(2, 10, schemapb.DataType_Int64, "intField", 101, true)

View File

@ -64,6 +64,7 @@ const (
StrictCastKey = "strict_cast"
RankGroupScorer = "rank_group_scorer"
AnnsFieldKey = "anns_field"
AnalyzerKey = "analyzer_name"
TopKKey = "topk"
NQKey = "nq"
MetricTypeKey = common.MetricTypeKey
@ -116,6 +117,7 @@ const (
ListResourceGroupsTaskName = "ListResourceGroupsTask"
DescribeResourceGroupTaskName = "DescribeResourceGroupTask"
RunAnalyzerTaskName = "RunAnalyzer"
HighlightTaskName = "Highlight"
CreateDatabaseTaskName = "CreateCollectionTask"
DropDatabaseTaskName = "DropDatabaseTaskName"
@ -3146,6 +3148,95 @@ func (t *RunAnalyzerTask) PostExecute(ctx context.Context) error {
return nil
}
// git highlight after search
type HighlightTask struct {
baseTask
Condition
*querypb.GetHighlightRequest
ctx context.Context
collectionName string
collectionID typeutil.UniqueID
dbName string
lb shardclient.LBPolicy
result *querypb.GetHighlightResponse
}
func (t *HighlightTask) TraceCtx() context.Context {
return t.ctx
}
func (t *HighlightTask) ID() UniqueID {
return t.Base.MsgID
}
func (t *HighlightTask) SetID(uid UniqueID) {
t.Base.MsgID = uid
}
func (t *HighlightTask) Name() string {
return HighlightTaskName
}
func (t *HighlightTask) Type() commonpb.MsgType {
return t.Base.MsgType
}
func (t *HighlightTask) BeginTs() Timestamp {
return t.Base.Timestamp
}
func (t *HighlightTask) EndTs() Timestamp {
return t.Base.Timestamp
}
func (t *HighlightTask) SetTs(ts Timestamp) {
t.Base.Timestamp = ts
}
func (t *HighlightTask) OnEnqueue() error {
if t.Base == nil {
t.Base = commonpbutil.NewMsgBase()
}
t.Base.MsgType = commonpb.MsgType_RunAnalyzer
t.Base.SourceID = paramtable.GetNodeID()
return nil
}
func (t *HighlightTask) PreExecute(ctx context.Context) error {
return nil
}
func (t *HighlightTask) getHighlightOnShardleader(ctx context.Context, nodeID int64, qn types.QueryNodeClient, channel string) error {
t.GetHighlightRequest.Channel = channel
resp, err := qn.GetHighlight(ctx, t.GetHighlightRequest)
if err != nil {
return err
}
if err := merr.Error(resp.GetStatus()); err != nil {
return err
}
t.result = resp
return nil
}
func (t *HighlightTask) Execute(ctx context.Context) error {
err := t.lb.ExecuteOneChannel(ctx, shardclient.CollectionWorkLoad{
Db: t.dbName,
CollectionName: t.collectionName,
CollectionID: t.collectionID,
Nq: int64(len(t.GetTopks()) * len(t.GetTasks())),
Exec: t.getHighlightOnShardleader,
})
return err
}
func (t *HighlightTask) PostExecute(ctx context.Context) error {
return nil
}
// isIgnoreGrowing is used to check if the request should ignore growing
func isIgnoreGrowing(params []*commonpb.KeyValuePair) (bool, error) {
for _, kv := range params {

View File

@ -17,6 +17,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/json"
"github.com/milvus-io/milvus/internal/parser/planparserv2"
"github.com/milvus-io/milvus/internal/proxy/accesslog"
"github.com/milvus-io/milvus/internal/proxy/shardclient"
@ -80,8 +81,8 @@ type searchTask struct {
translatedOutputFields []string
userOutputFields []string
userDynamicFields []string
resultBuf *typeutil.ConcurrentSet[*internalpb.SearchResults]
highlightTasks []*highlightTask
resultBuf *typeutil.ConcurrentSet[*internalpb.SearchResults]
partitionIDsSet *typeutil.ConcurrentSet[UniqueID]
@ -470,12 +471,16 @@ func (t *searchTask) initAdvancedSearchRequest(ctx context.Context) error {
}
// set analyzer name for sub search
analyzer, err := funcutil.GetAttrByKeyFromRepeatedKV("analyzer_name", subReq.GetSearchParams())
analyzer, err := funcutil.GetAttrByKeyFromRepeatedKV(AnalyzerKey, subReq.GetSearchParams())
if err == nil {
internalSubReq.AnalyzerName = analyzer
}
internalSubReq.FieldId = queryInfo.GetQueryFieldId()
if err := t.addHighlightTask(t.request.GetHighlighter(), internalSubReq.GetMetricType(), internalSubReq.FieldId, subReq.GetPlaceholderGroup(), internalSubReq.GetAnalyzerName()); err != nil {
return err
}
queryFieldIDs = append(queryFieldIDs, internalSubReq.FieldId)
// set PartitionIDs for sub search
if t.partitionKeyMode {
@ -562,6 +567,119 @@ func (t *searchTask) fillResult() {
t.result.CollectionName = t.collectionName
}
func (t *searchTask) getBM25SearchTexts(placeholder []byte) ([]string, error) {
pb := &commonpb.PlaceholderGroup{}
proto.Unmarshal(placeholder, pb)
if len(pb.Placeholders) != 1 || len(pb.Placeholders[0].Values) == 0 {
return nil, merr.WrapErrParameterInvalidMsg("please provide varchar/text for BM25 Function based search")
}
holder := pb.Placeholders[0]
if holder.Type != commonpb.PlaceholderType_VarChar {
return nil, merr.WrapErrParameterInvalidMsg(fmt.Sprintf("please provide varchar/text for BM25 Function based search, got %s", holder.Type.String()))
}
texts := funcutil.GetVarCharFromPlaceholder(holder)
return texts, nil
}
func (t *searchTask) createLexicalHighlighter(highlighter *commonpb.Highlighter, metricType string, annsField int64, placeholder []byte, analyzerName string) error {
task := &highlightTask{
HighlightTask: &querypb.HighlightTask{},
}
params := funcutil.KeyValuePair2Map(highlighter.GetParams())
if metricType != metric.BM25 {
return merr.WrapErrParameterInvalidMsg(`Search highlight only support with metric type "BM25" but was: %s`, t.SearchRequest.GetMetricType())
}
function, ok := getBM25FunctionOfAnnsField(annsField, t.schema.GetFunctions())
if !ok {
return merr.WrapErrServiceInternal(`Search with highlight failed, input field of BM25 annsField not found`)
}
task.FieldId = function.InputFieldIds[0]
task.FieldName = function.InputFieldNames[0]
if value, ok := params[HighlightSearchTextKey]; ok {
enable, err := strconv.ParseBool(value)
if err != nil {
return merr.WrapErrParameterInvalidMsg("unmarshal highlight_search_data as bool failed: %v", err)
}
// now only support highlight with search
// so skip if highlight search not enable.
if !enable {
return nil
}
}
// set pre_tags and post_tags
if value, ok := params[PreTagsKey]; ok {
tags := []string{}
if err := json.Unmarshal([]byte(value), &tags); err != nil {
return merr.WrapErrParameterInvalidMsg("unmarshal pre_tags as string list failed: %v", err)
}
if len(tags) == 0 {
return merr.WrapErrParameterInvalidMsg("pre_tags cannot be empty list")
}
task.preTags = make([][]byte, len(tags))
for i, tag := range tags {
task.preTags[i] = []byte(tag)
}
} else {
task.preTags = [][]byte{[]byte(DefaultPreTag)}
}
if value, ok := params[PostTagsKey]; ok {
tags := []string{}
if err := json.Unmarshal([]byte(value), &tags); err != nil {
return merr.WrapErrParameterInvalidMsg("unmarshal post_tags as string list failed: %v", err)
}
if len(tags) == 0 {
return merr.WrapErrParameterInvalidMsg("post_tags cannot be empty list")
}
task.postTags = make([][]byte, len(tags))
for i, tag := range tags {
task.postTags[i] = []byte(tag)
}
} else {
task.postTags = [][]byte{[]byte(DefaultPostTag)}
}
// set bm25 search text as query texts
texts, err := t.getBM25SearchTexts(placeholder)
if err != nil {
return err
}
task.Texts = texts
task.SearchTextNum = int64(len(texts))
if analyzerName != "" {
task.AnalyzerNames = []string{}
for i := 0; i < len(texts); i++ {
task.AnalyzerNames = append(task.AnalyzerNames, analyzerName)
}
}
t.highlightTasks = append(t.highlightTasks, task)
return nil
}
func (t *searchTask) addHighlightTask(highlighter *commonpb.Highlighter, metricType string, annsField int64, placeholder []byte, analyzerName string) error {
if highlighter == nil {
return nil
}
switch highlighter.GetType() {
case commonpb.HighlightType_Lexical:
return t.createLexicalHighlighter(highlighter, metricType, annsField, placeholder, analyzerName)
default:
return merr.WrapErrParameterInvalidMsg("unsupported highlight type: %v", highlighter.GetType())
}
}
func (t *searchTask) initSearchRequest(ctx context.Context) error {
ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "init search request")
defer sp.End()
@ -639,11 +757,14 @@ func (t *searchTask) initSearchRequest(ctx context.Context) error {
t.SearchRequest.GroupSize = queryInfo.GroupSize
if t.SearchRequest.MetricType == metric.BM25 {
analyzer, err := funcutil.GetAttrByKeyFromRepeatedKV("analyzer_name", t.request.GetSearchParams())
analyzer, err := funcutil.GetAttrByKeyFromRepeatedKV(AnalyzerKey, t.request.GetSearchParams())
if err == nil {
t.SearchRequest.AnalyzerName = analyzer
}
}
if err := t.addHighlightTask(t.request.GetHighlighter(), t.SearchRequest.MetricType, t.SearchRequest.FieldId, t.request.GetPlaceholderGroup(), t.SearchRequest.GetAnalyzerName()); err != nil {
return err
}
if embedding.HasNonBM25Functions(t.schema.CollectionSchema.Functions, []int64{queryInfo.GetQueryFieldId()}) {
ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Search-call-function-udf")
@ -819,7 +940,7 @@ func (t *searchTask) PostExecute(ctx context.Context) error {
t.isRecallEvaluation = isRecallEvaluation
// call pipeline
pipeline, err := newBuiltInPipeline(t)
pipeline, err := newSearchPipeline(t)
if err != nil {
log.Warn("Faild to create post process pipeline")
return err

View File

@ -4884,3 +4884,193 @@ func TestSearchTask_InitSearchRequestWithStructArrayFields(t *testing.T) {
})
}
}
func TestSearchTask_AddHighlightTask(t *testing.T) {
paramtable.Init()
// Create a schema with BM25 function
schema := &schemapb.CollectionSchema{
Name: "test_highlight_collection",
Fields: []*schemapb.FieldSchema{
{
FieldID: 100,
Name: "text_field",
DataType: schemapb.DataType_VarChar,
},
{
FieldID: 101,
Name: "sparse_field",
DataType: schemapb.DataType_SparseFloatVector,
},
},
Functions: []*schemapb.FunctionSchema{
{
Name: "bm25_func",
Type: schemapb.FunctionType_BM25,
InputFieldNames: []string{"text_field"},
InputFieldIds: []int64{100},
OutputFieldNames: []string{"sparse_field"},
OutputFieldIds: []int64{101},
},
},
}
placeholder := &commonpb.PlaceholderGroup{
Placeholders: []*commonpb.PlaceholderValue{{
Type: commonpb.PlaceholderType_VarChar,
Values: [][]byte{[]byte("test_str")},
}},
}
placeholderBytes, err := proto.Marshal(placeholder)
require.NoError(t, err)
t.Run("lexical highlight success", func(t *testing.T) {
task := &searchTask{
schema: &schemaInfo{
CollectionSchema: schema,
},
}
highlighter := &commonpb.Highlighter{
Type: commonpb.HighlightType_Lexical,
Params: []*commonpb.KeyValuePair{{Key: HighlightSearchTextKey, Value: "true"}},
}
err := task.addHighlightTask(highlighter, metric.BM25, 101, placeholderBytes, "")
assert.NoError(t, err)
assert.Equal(t, 1, len(task.highlightTasks))
assert.Equal(t, int64(100), task.highlightTasks[0].FieldId)
assert.Equal(t, "text_field", task.highlightTasks[0].FieldName)
})
t.Run("Lexical highlight with custom tags", func(t *testing.T) {
task := &searchTask{
schema: &schemaInfo{
CollectionSchema: schema,
},
}
highlighter := &commonpb.Highlighter{
Type: commonpb.HighlightType_Lexical,
Params: []*commonpb.KeyValuePair{{Key: HighlightSearchTextKey, Value: "true"}, {Key: "pre_tags", Value: `["<b>"]`}, {Key: "post_tags", Value: `["</b>"]`}},
}
err := task.addHighlightTask(highlighter, metric.BM25, 101, placeholderBytes, "")
assert.NoError(t, err)
assert.Equal(t, 1, len(task.highlightTasks))
assert.Equal(t, 1, len(task.highlightTasks[0].preTags))
assert.Equal(t, []byte("<b>"), task.highlightTasks[0].preTags[0])
assert.Equal(t, 1, len(task.highlightTasks[0].postTags))
assert.Equal(t, []byte("</b>"), task.highlightTasks[0].postTags[0])
})
t.Run("lexical highlight with wrong metric type", func(t *testing.T) {
task := &searchTask{
schema: &schemaInfo{
CollectionSchema: schema,
},
SearchRequest: &internalpb.SearchRequest{},
request: &milvuspb.SearchRequest{},
}
highlighter := &commonpb.Highlighter{
Type: commonpb.HighlightType_Lexical,
Params: []*commonpb.KeyValuePair{{Key: HighlightSearchTextKey, Value: "true"}},
}
err := task.addHighlightTask(highlighter, metric.L2, 101, placeholderBytes, "")
assert.Error(t, err)
})
t.Run("lexical highlight with invalid pre_tags type", func(t *testing.T) {
task := &searchTask{
schema: &schemaInfo{
CollectionSchema: schema,
},
}
highlighter := &commonpb.Highlighter{
Type: commonpb.HighlightType_Lexical,
Params: []*commonpb.KeyValuePair{{Key: HighlightSearchTextKey, Value: "true"}, {Key: "pre_tags", Value: "not_a_list"}},
}
err := task.addHighlightTask(highlighter, metric.BM25, 101, placeholderBytes, "")
assert.Error(t, err)
})
t.Run("default lexical highlight but not BM25 field", func(t *testing.T) {
schemaWithoutBM25 := &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
{
FieldID: 100,
Name: "vector_field",
DataType: schemapb.DataType_FloatVector,
},
},
}
task := &searchTask{
schema: &schemaInfo{
CollectionSchema: schemaWithoutBM25,
},
}
highlighter := &commonpb.Highlighter{
Type: commonpb.HighlightType_Lexical,
Params: []*commonpb.KeyValuePair{{Key: HighlightSearchTextKey, Value: "true"}},
}
err := task.addHighlightTask(highlighter, metric.BM25, 100, placeholderBytes, "")
assert.Error(t, err)
})
t.Run("highlight without highlight search text", func(t *testing.T) {
task := &searchTask{
schema: &schemaInfo{
CollectionSchema: schema,
},
}
highlighter := &commonpb.Highlighter{
Type: commonpb.HighlightType_Lexical,
Params: []*commonpb.KeyValuePair{{Key: HighlightSearchTextKey, Value: "false"}},
}
err := task.addHighlightTask(highlighter, metric.BM25, 101, placeholderBytes, "")
assert.NoError(t, err)
})
t.Run("highlight with invalid highlight search key", func(t *testing.T) {
task := &searchTask{
schema: &schemaInfo{
CollectionSchema: schema,
},
}
highlighter := &commonpb.Highlighter{
Type: commonpb.HighlightType_Lexical,
Params: []*commonpb.KeyValuePair{{Key: HighlightSearchTextKey, Value: "invalid"}},
}
err := task.addHighlightTask(highlighter, metric.BM25, 101, placeholderBytes, "")
assert.Error(t, err)
})
t.Run("highlight with unknown type", func(t *testing.T) {
task := &searchTask{
schema: &schemaInfo{
CollectionSchema: schema,
},
}
highlighter := &commonpb.Highlighter{
Type: 4,
Params: []*commonpb.KeyValuePair{{Key: HighlightSearchTextKey, Value: "true"}},
}
err := task.addHighlightTask(highlighter, metric.BM25, 101, placeholderBytes, "")
assert.Error(t, err)
})
}

View File

@ -39,6 +39,7 @@ import (
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/json"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/proxy/shardclient"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/util/function/embedding"
"github.com/milvus-io/milvus/pkg/v2/common"
@ -5497,3 +5498,237 @@ func TestAlterCollection_AllowInsertAutoID_AutoIDFalse(t *testing.T) {
assert.Error(t, err)
assert.Equal(t, merr.Code(merr.ErrParameterInvalid), merr.Code(err))
}
func TestHighlightTask(t *testing.T) {
paramtable.Init()
ctx := context.Background()
t.Run("basic methods", func(t *testing.T) {
task := &HighlightTask{
ctx: ctx,
GetHighlightRequest: &querypb.GetHighlightRequest{
Base: commonpbutil.NewMsgBase(),
},
Condition: NewTaskCondition(ctx),
}
t.Run("traceCtx", func(t *testing.T) {
traceCtx := task.TraceCtx()
assert.NotNil(t, traceCtx)
assert.Equal(t, ctx, traceCtx)
})
t.Run("id", func(t *testing.T) {
id := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
task.SetID(id)
assert.Equal(t, id, task.ID())
})
t.Run("name", func(t *testing.T) {
assert.Equal(t, HighlightTaskName, task.Name())
})
t.Run("type", func(t *testing.T) {
assert.Equal(t, commonpb.MsgType_Undefined, task.Type())
})
t.Run("ts", func(t *testing.T) {
ts := Timestamp(time.Now().UnixNano())
task.SetTs(ts)
assert.Equal(t, ts, task.BeginTs())
assert.Equal(t, ts, task.EndTs())
})
})
t.Run("OnEnqueue", func(t *testing.T) {
task := &HighlightTask{
ctx: ctx,
GetHighlightRequest: &querypb.GetHighlightRequest{},
Condition: NewTaskCondition(ctx),
}
err := task.OnEnqueue()
assert.NoError(t, err)
assert.NotNil(t, task.Base)
assert.Equal(t, commonpb.MsgType_RunAnalyzer, task.Base.MsgType)
assert.Equal(t, paramtable.GetNodeID(), task.Base.SourceID)
})
t.Run("PreExecute", func(t *testing.T) {
task := &HighlightTask{
ctx: ctx,
GetHighlightRequest: &querypb.GetHighlightRequest{
Base: commonpbutil.NewMsgBase(),
},
Condition: NewTaskCondition(ctx),
}
err := task.PreExecute(ctx)
assert.NoError(t, err)
})
t.Run("getHighlightOnShardleader success", func(t *testing.T) {
task := &HighlightTask{
ctx: ctx,
GetHighlightRequest: &querypb.GetHighlightRequest{
Base: commonpbutil.NewMsgBase(),
Topks: []int64{10},
Tasks: []*querypb.HighlightTask{
{},
},
},
Condition: NewTaskCondition(ctx),
}
mockQN := mocks.NewMockQueryNodeClient(t)
expectedResp := &querypb.GetHighlightResponse{
Status: merr.Success(),
Results: []*querypb.HighlightResult{
{
Fragments: []*querypb.HighlightFragment{
{
StartOffset: 0,
EndOffset: 10,
Offsets: []int64{0, 5, 5, 10},
},
},
},
},
}
mockQN.EXPECT().GetHighlight(mock.Anything, mock.Anything).Return(expectedResp, nil)
err := task.getHighlightOnShardleader(ctx, 1, mockQN, "test_channel")
assert.NoError(t, err)
assert.NotNil(t, task.result)
assert.Equal(t, expectedResp, task.result)
assert.Equal(t, "test_channel", task.GetHighlightRequest.Channel)
})
t.Run("getHighlightOnShardleader rpc error", func(t *testing.T) {
task := &HighlightTask{
ctx: ctx,
GetHighlightRequest: &querypb.GetHighlightRequest{
Base: commonpbutil.NewMsgBase(),
},
Condition: NewTaskCondition(ctx),
}
mockQN := mocks.NewMockQueryNodeClient(t)
mockQN.EXPECT().GetHighlight(mock.Anything, mock.Anything).Return(nil, errors.New("rpc error"))
err := task.getHighlightOnShardleader(ctx, 1, mockQN, "test_channel")
assert.Error(t, err)
assert.Contains(t, err.Error(), "rpc error")
})
t.Run("getHighlightOnShardleader status error", func(t *testing.T) {
task := &HighlightTask{
ctx: ctx,
GetHighlightRequest: &querypb.GetHighlightRequest{
Base: commonpbutil.NewMsgBase(),
},
Condition: NewTaskCondition(ctx),
}
mockQN := mocks.NewMockQueryNodeClient(t)
expectedResp := &querypb.GetHighlightResponse{
Status: merr.Status(errors.New("status error")),
}
mockQN.EXPECT().GetHighlight(mock.Anything, mock.Anything).Return(expectedResp, nil)
err := task.getHighlightOnShardleader(ctx, 1, mockQN, "test_channel")
assert.Error(t, err)
})
t.Run("Execute success", func(t *testing.T) {
task := &HighlightTask{
ctx: ctx,
collectionName: "test_collection",
collectionID: 100,
dbName: "default",
GetHighlightRequest: &querypb.GetHighlightRequest{
Base: commonpbutil.NewMsgBase(),
Topks: []int64{10, 20},
Tasks: []*querypb.HighlightTask{
{},
{},
},
},
Condition: NewTaskCondition(ctx),
}
mockLB := shardclient.NewMockLBPolicy(t)
mockQN := mocks.NewMockQueryNodeClient(t)
expectedResp := &querypb.GetHighlightResponse{
Status: merr.Success(),
Results: []*querypb.HighlightResult{
{
Fragments: []*querypb.HighlightFragment{
{
StartOffset: 0,
EndOffset: 10,
Offsets: []int64{0, 5, 5, 10},
},
},
},
},
}
mockQN.EXPECT().GetHighlight(mock.Anything, mock.Anything).Return(expectedResp, nil)
mockLB.EXPECT().ExecuteOneChannel(mock.Anything, mock.Anything).Run(
func(ctx context.Context, workload shardclient.CollectionWorkLoad) {
assert.Equal(t, "default", workload.Db)
assert.Equal(t, "test_collection", workload.CollectionName)
assert.Equal(t, int64(100), workload.CollectionID)
assert.Equal(t, int64(4), workload.Nq) // len(topks) * len(tasks) = 2 * 2 = 4
err := workload.Exec(ctx, 1, mockQN, "test_channel")
assert.NoError(t, err)
},
).Return(nil)
task.lb = mockLB
err := task.Execute(ctx)
assert.NoError(t, err)
assert.NotNil(t, task.result)
})
t.Run("Execute lb error", func(t *testing.T) {
task := &HighlightTask{
ctx: ctx,
collectionName: "test_collection",
collectionID: 100,
dbName: "default",
GetHighlightRequest: &querypb.GetHighlightRequest{
Base: commonpbutil.NewMsgBase(),
},
Condition: NewTaskCondition(ctx),
}
mockLB := shardclient.NewMockLBPolicy(t)
mockLB.EXPECT().ExecuteOneChannel(mock.Anything, mock.Anything).Return(errors.New("lb error"))
task.lb = mockLB
err := task.Execute(ctx)
assert.Error(t, err)
assert.Contains(t, err.Error(), "lb error")
})
t.Run("PostExecute", func(t *testing.T) {
task := &HighlightTask{
ctx: ctx,
GetHighlightRequest: &querypb.GetHighlightRequest{
Base: commonpbutil.NewMsgBase(),
},
Condition: NewTaskCondition(ctx),
}
err := task.PostExecute(ctx)
assert.NoError(t, err)
})
}

View File

@ -3025,3 +3025,9 @@ func genFunctionFields(ctx context.Context, insertMsg *msgstream.InsertMsg, sche
}
return nil
}
func getBM25FunctionOfAnnsField(fieldID int64, functions []*schemapb.FunctionSchema) (*schemapb.FunctionSchema, bool) {
return lo.Find(functions, func(function *schemapb.FunctionSchema) bool {
return function.GetType() == schemapb.FunctionType_BM25 && function.OutputFieldIds[0] == fieldID
})
}

View File

@ -324,6 +324,65 @@ func (_c *MockQueryNodeServer_GetDataDistribution_Call) RunAndReturn(run func(co
return _c
}
// GetHighlight provides a mock function with given fields: _a0, _a1
func (_m *MockQueryNodeServer) GetHighlight(_a0 context.Context, _a1 *querypb.GetHighlightRequest) (*querypb.GetHighlightResponse, error) {
ret := _m.Called(_a0, _a1)
if len(ret) == 0 {
panic("no return value specified for GetHighlight")
}
var r0 *querypb.GetHighlightResponse
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *querypb.GetHighlightRequest) (*querypb.GetHighlightResponse, error)); ok {
return rf(_a0, _a1)
}
if rf, ok := ret.Get(0).(func(context.Context, *querypb.GetHighlightRequest) *querypb.GetHighlightResponse); ok {
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*querypb.GetHighlightResponse)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *querypb.GetHighlightRequest) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockQueryNodeServer_GetHighlight_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetHighlight'
type MockQueryNodeServer_GetHighlight_Call struct {
*mock.Call
}
// GetHighlight is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 *querypb.GetHighlightRequest
func (_e *MockQueryNodeServer_Expecter) GetHighlight(_a0 interface{}, _a1 interface{}) *MockQueryNodeServer_GetHighlight_Call {
return &MockQueryNodeServer_GetHighlight_Call{Call: _e.mock.On("GetHighlight", _a0, _a1)}
}
func (_c *MockQueryNodeServer_GetHighlight_Call) Run(run func(_a0 context.Context, _a1 *querypb.GetHighlightRequest)) *MockQueryNodeServer_GetHighlight_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*querypb.GetHighlightRequest))
})
return _c
}
func (_c *MockQueryNodeServer_GetHighlight_Call) Return(_a0 *querypb.GetHighlightResponse, _a1 error) *MockQueryNodeServer_GetHighlight_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockQueryNodeServer_GetHighlight_Call) RunAndReturn(run func(context.Context, *querypb.GetHighlightRequest) (*querypb.GetHighlightResponse, error)) *MockQueryNodeServer_GetHighlight_Call {
_c.Call.Return(run)
return _c
}
// GetMetrics provides a mock function with given fields: _a0, _a1
func (_m *MockQueryNodeServer) GetMetrics(_a0 context.Context, _a1 *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
ret := _m.Called(_a0, _a1)

View File

@ -104,6 +104,7 @@ type ShardDelegator interface {
// analyzer
RunAnalyzer(ctx context.Context, req *querypb.RunAnalyzerRequest) ([]*milvuspb.AnalyzerResult, error)
GetHighlight(ctx context.Context, req *querypb.GetHighlightRequest) ([]*querypb.HighlightResult, error)
// control
Serviceable() bool

View File

@ -29,6 +29,7 @@ import (
"google.golang.org/protobuf/proto"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/querynodev2/cluster"
@ -1001,3 +1002,57 @@ func (sd *shardDelegator) DropIndex(ctx context.Context, req *querypb.DropIndexR
}
return nil
}
func (sd *shardDelegator) GetHighlight(ctx context.Context, req *querypb.GetHighlightRequest) ([]*querypb.HighlightResult, error) {
result := []*querypb.HighlightResult{}
for _, task := range req.GetTasks() {
if len(task.GetTexts()) != int(task.GetSearchTextNum()+task.GetCorpusTextNum()) {
return nil, errors.Errorf("package highlight texts error, num of texts not equal the expected num %d:%d", len(task.GetTexts()), task.GetSearchTextNum()+task.GetCorpusTextNum())
}
analyzer, ok := sd.analyzerRunners[task.GetFieldId()]
if !ok {
return nil, merr.WrapErrParameterInvalidMsg("get highlight failed, the highlight field not found, %s:%d", task.GetFieldName(), task.GetFieldId())
}
topks := req.GetTopks()
var results [][]*milvuspb.AnalyzerToken
var err error
if len(analyzer.GetInputFields()) == 1 {
results, err = analyzer.BatchAnalyze(true, false, task.GetTexts())
if err != nil {
return nil, err
}
} else if len(analyzer.GetInputFields()) == 2 {
// use analyzer names if analyzer need two input field
results, err = analyzer.BatchAnalyze(true, false, task.GetTexts(), task.GetAnalyzerNames())
if err != nil {
return nil, err
}
}
// analyze result of search text
searchResults := results[0:task.SearchTextNum]
// analyze result of corpus text
corpusResults := results[task.SearchTextNum:]
corpusIdx := 0
for i, tokens := range searchResults {
tokenSet := typeutil.NewSet[string]()
for _, token := range tokens {
tokenSet.Insert(token.GetToken())
}
for j := 0; j < int(topks[i]); j++ {
offsets := []int64{}
for _, token := range corpusResults[corpusIdx] {
if tokenSet.Contain(token.GetToken()) {
offsets = append(offsets, token.GetStartOffset(), token.GetEndOffset())
}
}
result = append(result, &querypb.HighlightResult{Fragments: []*querypb.HighlightFragment{{StartOffset: 0, EndOffset: int64(len(task.Texts[int(task.SearchTextNum)+corpusIdx])), Offsets: offsets}}})
corpusIdx++
}
}
}
return result, nil
}

View File

@ -1596,6 +1596,159 @@ func (s *DelegatorSuite) TestRunAnalyzer() {
})
}
func (s *DelegatorSuite) TestGetHighlight() {
ctx := context.Background()
s.TestCreateDelegatorWithFunction()
s.Run("field analyzer not exist", func() {
_, err := s.delegator.GetHighlight(ctx, &querypb.GetHighlightRequest{
Topks: []int64{1},
Tasks: []*querypb.HighlightTask{
{
FieldId: 999, // non-existent field
},
},
})
s.Require().Error(err)
})
s.Run("normal highlight with single analyzer", func() {
s.manager.Collection.PutOrRef(s.collectionID, &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: 100,
Name: "text",
DataType: schemapb.DataType_VarChar,
TypeParams: []*commonpb.KeyValuePair{{Key: "analyzer_params", Value: "{}"}},
},
{
FieldID: 101,
Name: "sparse",
DataType: schemapb.DataType_SparseFloatVector,
},
},
Functions: []*schemapb.FunctionSchema{{
Type: schemapb.FunctionType_BM25,
InputFieldNames: []string{"text"},
InputFieldIds: []int64{100},
OutputFieldNames: []string{"sparse"},
OutputFieldIds: []int64{101},
}},
}, nil, &querypb.LoadMetaInfo{SchemaVersion: tsoutil.ComposeTSByTime(time.Now(), 0)})
s.ResetDelegator()
result, err := s.delegator.GetHighlight(ctx, &querypb.GetHighlightRequest{
Topks: []int64{2},
Tasks: []*querypb.HighlightTask{
{
FieldId: 100,
Texts: []string{"test", "this is a test document", "another test case"},
SearchTextNum: 1,
CorpusTextNum: 2,
},
},
})
s.Require().NoError(err)
s.Require().Equal(2, len(result))
// Check that we got highlight results
s.Require().NotNil(result[0].Fragments)
s.Require().NotNil(result[1].Fragments)
})
s.Run("highlight with multi analyzer", func() {
s.manager.Collection.PutOrRef(s.collectionID, &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: 100,
Name: "text",
DataType: schemapb.DataType_VarChar,
TypeParams: []*commonpb.KeyValuePair{{Key: "multi_analyzer_params", Value: `{
"by_field": "analyzer",
"analyzers": {
"standard": {},
"default": {}
}
}`}},
},
{
FieldID: 101,
Name: "sparse",
DataType: schemapb.DataType_SparseFloatVector,
},
{
FieldID: 102,
Name: "analyzer",
DataType: schemapb.DataType_VarChar,
},
},
Functions: []*schemapb.FunctionSchema{{
Type: schemapb.FunctionType_BM25,
InputFieldNames: []string{"text"},
InputFieldIds: []int64{100},
OutputFieldNames: []string{"sparse"},
OutputFieldIds: []int64{101},
}},
}, nil, &querypb.LoadMetaInfo{SchemaVersion: tsoutil.ComposeTSByTime(time.Now(), 0)})
s.ResetDelegator()
// two target with two analyzer
result, err := s.delegator.GetHighlight(ctx, &querypb.GetHighlightRequest{
Topks: []int64{1, 1},
Tasks: []*querypb.HighlightTask{
{
FieldId: 100,
Texts: []string{"test1", "test2", "this is a test1 document", "another test2 case"},
AnalyzerNames: []string{"default", "standard", "default", "default"},
SearchTextNum: 2,
CorpusTextNum: 2,
},
},
})
s.Require().NoError(err)
s.Require().Equal(2, len(result))
})
s.Run("empty target texts", func() {
s.manager.Collection.PutOrRef(s.collectionID, &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: 100,
Name: "text",
DataType: schemapb.DataType_VarChar,
TypeParams: []*commonpb.KeyValuePair{{Key: "analyzer_params", Value: "{}"}},
},
{
FieldID: 101,
Name: "sparse",
DataType: schemapb.DataType_SparseFloatVector,
},
},
Functions: []*schemapb.FunctionSchema{{
Type: schemapb.FunctionType_BM25,
InputFieldNames: []string{"text"},
InputFieldIds: []int64{100},
OutputFieldNames: []string{"sparse"},
OutputFieldIds: []int64{101},
}},
}, nil, &querypb.LoadMetaInfo{SchemaVersion: tsoutil.ComposeTSByTime(time.Now(), 0)})
s.ResetDelegator()
result, err := s.delegator.GetHighlight(ctx, &querypb.GetHighlightRequest{
Topks: []int64{1},
Tasks: []*querypb.HighlightTask{
{
FieldId: 100,
Texts: []string{"test document"},
SearchTextNum: 0,
CorpusTextNum: 1,
},
},
})
s.Require().NoError(err)
s.Require().NotNil(result)
})
}
// TestDelegatorLifetimeIntegration tests the integration of lifetime state checks with main delegator methods
func (s *DelegatorSuite) TestDelegatorLifetimeIntegration() {
sd := s.delegator.(*shardDelegator)

View File

@ -289,6 +289,65 @@ func (_c *MockShardDelegator_GetDeleteBufferSize_Call) RunAndReturn(run func() (
return _c
}
// GetHighlight provides a mock function with given fields: ctx, req
func (_m *MockShardDelegator) GetHighlight(ctx context.Context, req *querypb.GetHighlightRequest) ([]*querypb.HighlightResult, error) {
ret := _m.Called(ctx, req)
if len(ret) == 0 {
panic("no return value specified for GetHighlight")
}
var r0 []*querypb.HighlightResult
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *querypb.GetHighlightRequest) ([]*querypb.HighlightResult, error)); ok {
return rf(ctx, req)
}
if rf, ok := ret.Get(0).(func(context.Context, *querypb.GetHighlightRequest) []*querypb.HighlightResult); ok {
r0 = rf(ctx, req)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*querypb.HighlightResult)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *querypb.GetHighlightRequest) error); ok {
r1 = rf(ctx, req)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockShardDelegator_GetHighlight_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetHighlight'
type MockShardDelegator_GetHighlight_Call struct {
*mock.Call
}
// GetHighlight is a helper method to define mock.On call
// - ctx context.Context
// - req *querypb.GetHighlightRequest
func (_e *MockShardDelegator_Expecter) GetHighlight(ctx interface{}, req interface{}) *MockShardDelegator_GetHighlight_Call {
return &MockShardDelegator_GetHighlight_Call{Call: _e.mock.On("GetHighlight", ctx, req)}
}
func (_c *MockShardDelegator_GetHighlight_Call) Run(run func(ctx context.Context, req *querypb.GetHighlightRequest)) *MockShardDelegator_GetHighlight_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*querypb.GetHighlightRequest))
})
return _c
}
func (_c *MockShardDelegator_GetHighlight_Call) Return(_a0 []*querypb.HighlightResult, _a1 error) *MockShardDelegator_GetHighlight_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockShardDelegator_GetHighlight_Call) RunAndReturn(run func(context.Context, *querypb.GetHighlightRequest) ([]*querypb.HighlightResult, error)) *MockShardDelegator_GetHighlight_Call {
_c.Call.Return(run)
return _c
}
// GetPartitionStatsVersions provides a mock function with given fields: ctx
func (_m *MockShardDelegator) GetPartitionStatsVersions(ctx context.Context) map[int64]int64 {
ret := _m.Called(ctx)

View File

@ -1741,3 +1741,36 @@ func (node *QueryNode) DropIndex(ctx context.Context, req *querypb.DropIndexRequ
return merr.Success(), nil
}
func (node *QueryNode) GetHighlight(ctx context.Context, req *querypb.GetHighlightRequest) (*querypb.GetHighlightResponse, error) {
// check node healthy
if err := node.lifetime.Add(merr.IsHealthy); err != nil {
return &querypb.GetHighlightResponse{
Status: merr.Status(err),
}, nil
}
defer node.lifetime.Done()
// get delegator
sd, ok := node.delegators.Get(req.GetChannel())
if !ok {
err := merr.WrapErrChannelNotFound(req.GetChannel())
log.Warn("GetHighlight failed, failed to get shard delegator", zap.Error(err))
return &querypb.GetHighlightResponse{
Status: merr.Status(err),
}, nil
}
results, err := sd.GetHighlight(ctx, req)
if err != nil {
log.Warn("GetHighlight failed, delegator run failed", zap.Error(err))
return &querypb.GetHighlightResponse{
Status: merr.Status(err),
}, nil
}
return &querypb.GetHighlightResponse{
Status: merr.Success(),
Results: results,
}, nil
}

View File

@ -2471,6 +2471,49 @@ func (suite *ServiceSuite) TestValidateAnalyzer() {
})
}
func (suite *ServiceSuite) TestGetHighlight() {
ctx := context.Background()
suite.Run("node not healthy", func() {
suite.node.UpdateStateCode(commonpb.StateCode_Abnormal)
defer suite.node.UpdateStateCode(commonpb.StateCode_Healthy)
resp, err := suite.node.GetHighlight(ctx, &querypb.GetHighlightRequest{
Channel: suite.vchannel,
Topks: []int64{10},
})
suite.NoError(err)
suite.Error(merr.Error(resp.GetStatus()))
})
suite.Run("normal case", func() {
delegator := &delegator.MockShardDelegator{}
suite.node.delegators.Insert(suite.vchannel, delegator)
defer suite.node.delegators.GetAndRemove(suite.vchannel)
delegator.EXPECT().GetHighlight(mock.Anything, mock.Anything).Return(
[]*querypb.HighlightResult{}, nil)
resp, err := suite.node.GetHighlight(ctx, &querypb.GetHighlightRequest{
Channel: suite.vchannel,
Topks: []int64{1, 1},
Tasks: []*querypb.HighlightTask{
{
FieldName: "text_field",
FieldId: 100,
Texts: []string{"target text", "target text2", "text", "text2"},
AnalyzerNames: []string{"standard", "standard", "standard", "standard"},
SearchTextNum: 2,
CorpusTextNum: 2,
},
},
})
suite.NoError(err)
suite.NoError(merr.Error(resp.GetStatus()))
suite.NotNil(resp.Results)
})
}
func TestQueryNodeService(t *testing.T) {
wal := mock_streaming.NewMockWALAccesser(t)
local := mock_streaming.NewMockLocal(t)

View File

@ -146,6 +146,10 @@ func (m *GrpcQueryNodeClient) ValidateAnalyzer(ctx context.Context, in *querypb.
return &commonpb.Status{}, m.Err
}
func (m *GrpcQueryNodeClient) GetHighlight(ctx context.Context, in *querypb.GetHighlightRequest, opts ...grpc.CallOption) (*querypb.GetHighlightResponse, error) {
return &querypb.GetHighlightResponse{}, m.Err
}
func (m *GrpcQueryNodeClient) Close() error {
return m.Err
}

View File

@ -160,6 +160,10 @@ func (qn *qnServerWrapper) RunAnalyzer(ctx context.Context, in *querypb.RunAnaly
return qn.QueryNode.RunAnalyzer(ctx, in)
}
func (qn *qnServerWrapper) GetHighlight(ctx context.Context, in *querypb.GetHighlightRequest, _ ...grpc.CallOption) (*querypb.GetHighlightResponse, error) {
return qn.QueryNode.GetHighlight(ctx, in)
}
func (qn *qnServerWrapper) DropIndex(ctx context.Context, in *querypb.DropIndexRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
return qn.QueryNode.DropIndex(ctx, in)
}

View File

@ -174,6 +174,8 @@ service QueryNode {
rpc UpdateSchema(UpdateSchemaRequest) returns (common.Status) {}
rpc RunAnalyzer(RunAnalyzerRequest) returns(milvus.RunAnalyzerResponse){}
rpc GetHighlight(GetHighlightRequest) returns (GetHighlightResponse){}
rpc DropIndex(DropIndexRequest) returns (common.Status) {}
rpc ValidateAnalyzer(ValidateAnalyzerRequest) returns(common.Status){}
}
@ -1012,6 +1014,42 @@ message ValidateAnalyzerRequest{
repeated AnalyzerInfo analyzer_infos = 2;
}
// HighlightTask fetch highlight for all queries at one field
// len(texts) == search_text_num + corpus_text_num
message HighlightTask{
string field_name = 1;
int64 field_id = 2;
repeated string texts = 3;
repeated string analyzer_names = 4; // used if field with multi-analyzer
int64 search_text_num = 5;
int64 corpus_text_num = 6;
}
message GetHighlightRequest{
common.MsgBase base = 1;
string channel = 2;
repeated int64 topks = 3;
repeated HighlightTask tasks=4; // one task for one field
}
// start_offset and end_offset are fragment offset in the original text
// number of offsets always be 2 * number of highlight terms in the fragment
message HighlightFragment{
int64 start_offset = 1;
int64 end_offset = 2;
repeated int64 offsets = 3;
}
message HighlightResult{
repeated HighlightFragment fragments = 2;
}
message GetHighlightResponse{
common.Status status = 1;
repeated HighlightResult results = 2;
}
message ListLoadedSegmentsRequest {
common.MsgBase base = 1;
}

File diff suppressed because it is too large Load Diff

View File

@ -1552,6 +1552,7 @@ const (
QueryNode_DeleteBatch_FullMethodName = "/milvus.proto.query.QueryNode/DeleteBatch"
QueryNode_UpdateSchema_FullMethodName = "/milvus.proto.query.QueryNode/UpdateSchema"
QueryNode_RunAnalyzer_FullMethodName = "/milvus.proto.query.QueryNode/RunAnalyzer"
QueryNode_GetHighlight_FullMethodName = "/milvus.proto.query.QueryNode/GetHighlight"
QueryNode_DropIndex_FullMethodName = "/milvus.proto.query.QueryNode/DropIndex"
QueryNode_ValidateAnalyzer_FullMethodName = "/milvus.proto.query.QueryNode/ValidateAnalyzer"
)
@ -1590,6 +1591,7 @@ type QueryNodeClient interface {
DeleteBatch(ctx context.Context, in *DeleteBatchRequest, opts ...grpc.CallOption) (*DeleteBatchResponse, error)
UpdateSchema(ctx context.Context, in *UpdateSchemaRequest, opts ...grpc.CallOption) (*commonpb.Status, error)
RunAnalyzer(ctx context.Context, in *RunAnalyzerRequest, opts ...grpc.CallOption) (*milvuspb.RunAnalyzerResponse, error)
GetHighlight(ctx context.Context, in *GetHighlightRequest, opts ...grpc.CallOption) (*GetHighlightResponse, error)
DropIndex(ctx context.Context, in *DropIndexRequest, opts ...grpc.CallOption) (*commonpb.Status, error)
ValidateAnalyzer(ctx context.Context, in *ValidateAnalyzerRequest, opts ...grpc.CallOption) (*commonpb.Status, error)
}
@ -1891,6 +1893,15 @@ func (c *queryNodeClient) RunAnalyzer(ctx context.Context, in *RunAnalyzerReques
return out, nil
}
func (c *queryNodeClient) GetHighlight(ctx context.Context, in *GetHighlightRequest, opts ...grpc.CallOption) (*GetHighlightResponse, error) {
out := new(GetHighlightResponse)
err := c.cc.Invoke(ctx, QueryNode_GetHighlight_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *queryNodeClient) DropIndex(ctx context.Context, in *DropIndexRequest, opts ...grpc.CallOption) (*commonpb.Status, error) {
out := new(commonpb.Status)
err := c.cc.Invoke(ctx, QueryNode_DropIndex_FullMethodName, in, out, opts...)
@ -1943,6 +1954,7 @@ type QueryNodeServer interface {
DeleteBatch(context.Context, *DeleteBatchRequest) (*DeleteBatchResponse, error)
UpdateSchema(context.Context, *UpdateSchemaRequest) (*commonpb.Status, error)
RunAnalyzer(context.Context, *RunAnalyzerRequest) (*milvuspb.RunAnalyzerResponse, error)
GetHighlight(context.Context, *GetHighlightRequest) (*GetHighlightResponse, error)
DropIndex(context.Context, *DropIndexRequest) (*commonpb.Status, error)
ValidateAnalyzer(context.Context, *ValidateAnalyzerRequest) (*commonpb.Status, error)
}
@ -2032,6 +2044,9 @@ func (UnimplementedQueryNodeServer) UpdateSchema(context.Context, *UpdateSchemaR
func (UnimplementedQueryNodeServer) RunAnalyzer(context.Context, *RunAnalyzerRequest) (*milvuspb.RunAnalyzerResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method RunAnalyzer not implemented")
}
func (UnimplementedQueryNodeServer) GetHighlight(context.Context, *GetHighlightRequest) (*GetHighlightResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetHighlight not implemented")
}
func (UnimplementedQueryNodeServer) DropIndex(context.Context, *DropIndexRequest) (*commonpb.Status, error) {
return nil, status.Errorf(codes.Unimplemented, "method DropIndex not implemented")
}
@ -2542,6 +2557,24 @@ func _QueryNode_RunAnalyzer_Handler(srv interface{}, ctx context.Context, dec fu
return interceptor(ctx, in, info, handler)
}
func _QueryNode_GetHighlight_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(GetHighlightRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(QueryNodeServer).GetHighlight(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: QueryNode_GetHighlight_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(QueryNodeServer).GetHighlight(ctx, req.(*GetHighlightRequest))
}
return interceptor(ctx, in, info, handler)
}
func _QueryNode_DropIndex_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(DropIndexRequest)
if err := dec(in); err != nil {
@ -2685,6 +2718,10 @@ var QueryNode_ServiceDesc = grpc.ServiceDesc{
MethodName: "RunAnalyzer",
Handler: _QueryNode_RunAnalyzer_Handler,
},
{
MethodName: "GetHighlight",
Handler: _QueryNode_GetHighlight_Handler,
},
{
MethodName: "DropIndex",
Handler: _QueryNode_DropIndex_Handler,