enhance: [2.5] support run analyzer by loaded collection field (#42119)

relate: https://github.com/milvus-io/milvus/issues/42094
pr: https://github.com/milvus-io/milvus/pull/42113

Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
This commit is contained in:
aoiasd 2025-05-29 10:26:30 +08:00 committed by GitHub
parent 4a05180f88
commit 198ff1f150
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
30 changed files with 2064 additions and 771 deletions

2
go.mod
View File

@ -21,7 +21,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/klauspost/compress v1.18.0
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.13-0.20250520065018-13f9a20ffaad
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.13-0.20250527033022-8bab5b3dea49
github.com/minio/minio-go/v7 v7.0.73
github.com/panjf2000/ants/v2 v2.11.3 // indirect
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81

2
go.sum
View File

@ -636,6 +636,8 @@ github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZz
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.13-0.20250520065018-13f9a20ffaad h1:tkSvshW0g1nbf7gPIsku838FFvrlucnPtAXYBiGuTAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.13-0.20250520065018-13f9a20ffaad/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.13-0.20250527033022-8bab5b3dea49 h1:ERaE86WBbccuyqDpdb+JUFniZ7mSdhw6FK7m18PX+t8=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.13-0.20250527033022-8bab5b3dea49/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE=
github.com/milvus-io/pulsar-client-go v0.12.1/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=

View File

@ -19,10 +19,10 @@ package datacoord
import (
"fmt"
"github.com/cockroachdb/errors"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/v2/log"

View File

@ -361,3 +361,14 @@ func (c *Client) DeleteBatch(ctx context.Context, req *querypb.DeleteBatchReques
return client.DeleteBatch(ctx, req)
})
}
func (c *Client) RunAnalyzer(ctx context.Context, req *querypb.RunAnalyzerRequest, _ ...grpc.CallOption) (*milvuspb.RunAnalyzerResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(c.nodeID),
)
return wrapGrpcCall(ctx, c, func(client querypb.QueryNodeClient) (*milvuspb.RunAnalyzerResponse, error) {
return client.RunAnalyzer(ctx, req)
})
}

View File

@ -394,3 +394,7 @@ func (s *Server) Delete(ctx context.Context, req *querypb.DeleteRequest) (*commo
func (s *Server) DeleteBatch(ctx context.Context, req *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error) {
return s.querynode.DeleteBatch(ctx, req)
}
func (s *Server) RunAnalyzer(ctx context.Context, req *querypb.RunAnalyzerRequest) (*milvuspb.RunAnalyzerResponse, error) {
return s.querynode.RunAnalyzer(ctx, req)
}

View File

@ -1248,6 +1248,65 @@ func (_c *MockQueryNode_ReleaseSegments_Call) RunAndReturn(run func(context.Cont
return _c
}
// RunAnalyzer provides a mock function with given fields: _a0, _a1
func (_m *MockQueryNode) RunAnalyzer(_a0 context.Context, _a1 *querypb.RunAnalyzerRequest) (*milvuspb.RunAnalyzerResponse, error) {
ret := _m.Called(_a0, _a1)
if len(ret) == 0 {
panic("no return value specified for RunAnalyzer")
}
var r0 *milvuspb.RunAnalyzerResponse
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *querypb.RunAnalyzerRequest) (*milvuspb.RunAnalyzerResponse, error)); ok {
return rf(_a0, _a1)
}
if rf, ok := ret.Get(0).(func(context.Context, *querypb.RunAnalyzerRequest) *milvuspb.RunAnalyzerResponse); ok {
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*milvuspb.RunAnalyzerResponse)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *querypb.RunAnalyzerRequest) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockQueryNode_RunAnalyzer_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RunAnalyzer'
type MockQueryNode_RunAnalyzer_Call struct {
*mock.Call
}
// RunAnalyzer is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 *querypb.RunAnalyzerRequest
func (_e *MockQueryNode_Expecter) RunAnalyzer(_a0 interface{}, _a1 interface{}) *MockQueryNode_RunAnalyzer_Call {
return &MockQueryNode_RunAnalyzer_Call{Call: _e.mock.On("RunAnalyzer", _a0, _a1)}
}
func (_c *MockQueryNode_RunAnalyzer_Call) Run(run func(_a0 context.Context, _a1 *querypb.RunAnalyzerRequest)) *MockQueryNode_RunAnalyzer_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*querypb.RunAnalyzerRequest))
})
return _c
}
func (_c *MockQueryNode_RunAnalyzer_Call) Return(_a0 *milvuspb.RunAnalyzerResponse, _a1 error) *MockQueryNode_RunAnalyzer_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockQueryNode_RunAnalyzer_Call) RunAndReturn(run func(context.Context, *querypb.RunAnalyzerRequest) (*milvuspb.RunAnalyzerResponse, error)) *MockQueryNode_RunAnalyzer_Call {
_c.Call.Return(run)
return _c
}
// Search provides a mock function with given fields: _a0, _a1
func (_m *MockQueryNode) Search(_a0 context.Context, _a1 *querypb.SearchRequest) (*internalpb.SearchResults, error) {
ret := _m.Called(_a0, _a1)

View File

@ -1408,6 +1408,80 @@ func (_c *MockQueryNodeClient_ReleaseSegments_Call) RunAndReturn(run func(contex
return _c
}
// RunAnalyzer provides a mock function with given fields: ctx, in, opts
func (_m *MockQueryNodeClient) RunAnalyzer(ctx context.Context, in *querypb.RunAnalyzerRequest, opts ...grpc.CallOption) (*milvuspb.RunAnalyzerResponse, error) {
_va := make([]interface{}, len(opts))
for _i := range opts {
_va[_i] = opts[_i]
}
var _ca []interface{}
_ca = append(_ca, ctx, in)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
if len(ret) == 0 {
panic("no return value specified for RunAnalyzer")
}
var r0 *milvuspb.RunAnalyzerResponse
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *querypb.RunAnalyzerRequest, ...grpc.CallOption) (*milvuspb.RunAnalyzerResponse, error)); ok {
return rf(ctx, in, opts...)
}
if rf, ok := ret.Get(0).(func(context.Context, *querypb.RunAnalyzerRequest, ...grpc.CallOption) *milvuspb.RunAnalyzerResponse); ok {
r0 = rf(ctx, in, opts...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*milvuspb.RunAnalyzerResponse)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *querypb.RunAnalyzerRequest, ...grpc.CallOption) error); ok {
r1 = rf(ctx, in, opts...)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockQueryNodeClient_RunAnalyzer_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RunAnalyzer'
type MockQueryNodeClient_RunAnalyzer_Call struct {
*mock.Call
}
// RunAnalyzer is a helper method to define mock.On call
// - ctx context.Context
// - in *querypb.RunAnalyzerRequest
// - opts ...grpc.CallOption
func (_e *MockQueryNodeClient_Expecter) RunAnalyzer(ctx interface{}, in interface{}, opts ...interface{}) *MockQueryNodeClient_RunAnalyzer_Call {
return &MockQueryNodeClient_RunAnalyzer_Call{Call: _e.mock.On("RunAnalyzer",
append([]interface{}{ctx, in}, opts...)...)}
}
func (_c *MockQueryNodeClient_RunAnalyzer_Call) Run(run func(ctx context.Context, in *querypb.RunAnalyzerRequest, opts ...grpc.CallOption)) *MockQueryNodeClient_RunAnalyzer_Call {
_c.Call.Run(func(args mock.Arguments) {
variadicArgs := make([]grpc.CallOption, len(args)-2)
for i, a := range args[2:] {
if a != nil {
variadicArgs[i] = a.(grpc.CallOption)
}
}
run(args[0].(context.Context), args[1].(*querypb.RunAnalyzerRequest), variadicArgs...)
})
return _c
}
func (_c *MockQueryNodeClient_RunAnalyzer_Call) Return(_a0 *milvuspb.RunAnalyzerResponse, _a1 error) *MockQueryNodeClient_RunAnalyzer_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockQueryNodeClient_RunAnalyzer_Call) RunAndReturn(run func(context.Context, *querypb.RunAnalyzerRequest, ...grpc.CallOption) (*milvuspb.RunAnalyzerResponse, error)) *MockQueryNodeClient_RunAnalyzer_Call {
_c.Call.Return(run)
return _c
}
// Search provides a mock function with given fields: ctx, in, opts
func (_m *MockQueryNodeClient) Search(ctx context.Context, in *querypb.SearchRequest, opts ...grpc.CallOption) (*internalpb.SearchResults, error) {
_va := make([]interface{}, len(opts))

View File

@ -7042,14 +7042,12 @@ func (node *Proxy) OperatePrivilegeGroup(ctx context.Context, req *milvuspb.Oper
return result, nil
}
func (node *Proxy) RunAnalyzer(ctx context.Context, req *milvuspb.RunAnalyzerRequest) (*milvuspb.RunAnalyzerResponse, error) {
// TODO: use collection analyzer when collection name and field name not none
func (node *Proxy) runAnalyzer(req *milvuspb.RunAnalyzerRequest) ([]*milvuspb.AnalyzerResult, error) {
tokenizer, err := ctokenizer.NewTokenizer(req.GetAnalyzerParams())
if err != nil {
return &milvuspb.RunAnalyzerResponse{
Status: merr.Status(err),
}, nil
return nil, err
}
defer tokenizer.Destroy()
results := make([]*milvuspb.AnalyzerResult, len(req.GetPlaceholder()))
@ -7075,9 +7073,90 @@ func (node *Proxy) RunAnalyzer(ctx context.Context, req *milvuspb.RunAnalyzerReq
results[i].Tokens = append(results[i].Tokens, token)
}
}
return results, nil
}
func (node *Proxy) RunAnalyzer(ctx context.Context, req *milvuspb.RunAnalyzerRequest) (*milvuspb.RunAnalyzerResponse, error) {
ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-RunAnalyzer")
defer sp.End()
if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
return &milvuspb.RunAnalyzerResponse{
Status: merr.Status(err),
}, nil
}
if len(req.Placeholder) == 0 {
return &milvuspb.RunAnalyzerResponse{
Status: merr.Status(nil),
Results: make([]*milvuspb.AnalyzerResult, 0),
}, nil
}
if req.GetCollectionName() == "" {
results, err := node.runAnalyzer(req)
if err != nil {
return &milvuspb.RunAnalyzerResponse{
Status: merr.Status(err),
}, nil
}
return &milvuspb.RunAnalyzerResponse{
Status: merr.Status(nil),
Results: results,
}, nil
}
if err := validateRunAnalyzer(req); err != nil {
return &milvuspb.RunAnalyzerResponse{
Status: merr.Status(merr.WrapErrAsInputError(err)),
}, nil
}
method := "RunAnalyzer"
task := &RunAnalyzerTask{
ctx: ctx,
lb: node.lbPolicy,
Condition: NewTaskCondition(ctx),
RunAnalyzerRequest: req,
}
if err := node.sched.dqQueue.Enqueue(task); err != nil {
log.Warn(
rpcFailedToEnqueue(method),
zap.Error(err),
)
metrics.ProxyFunctionCall.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
method,
metrics.AbandonLabel,
req.GetDbName(),
req.GetCollectionName(),
).Inc()
return &milvuspb.RunAnalyzerResponse{
Status: merr.Status(err),
}, nil
}
if err := task.WaitToFinish(); err != nil {
log.Warn(
rpcFailedToWaitToFinish(method),
zap.Error(err),
)
metrics.ProxyFunctionCall.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
method,
metrics.FailLabel,
req.GetDbName(),
req.GetCollectionName(),
).Inc()
return &milvuspb.RunAnalyzerResponse{
Status: merr.Status(err),
}, nil
}
return task.result, nil
}

View File

@ -57,6 +57,7 @@ import (
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
"github.com/milvus-io/milvus/pkg/v2/util/ratelimitutil"
"github.com/milvus-io/milvus/pkg/v2/util/resource"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
func TestProxy_InvalidateCollectionMetaCache_remove_stream(t *testing.T) {
@ -2196,22 +2197,81 @@ func TestAlterCollectionReplicateProperty(t *testing.T) {
}
func TestRunAnalyzer(t *testing.T) {
paramtable.Init()
ctx := context.Background()
cache := globalMetaCache
globalMetaCache = nil
defer func() { globalMetaCache = cache }()
p := &Proxy{}
// run analyzer with default params
tsoAllocatorIns := newMockTsoAllocator()
sched, err := newTaskScheduler(ctx, tsoAllocatorIns, p.factory)
require.NoError(t, err)
sched.Start()
defer sched.Close()
p.sched = sched
t.Run("run analyzer err with node not healthy", func(t *testing.T) {
p.UpdateStateCode(commonpb.StateCode_Abnormal)
resp, err := p.RunAnalyzer(context.Background(), &milvuspb.RunAnalyzerRequest{
Placeholder: [][]byte{[]byte("test doc")},
})
require.NoError(t, err)
require.Error(t, merr.Error(resp.GetStatus()))
})
p.UpdateStateCode(commonpb.StateCode_Healthy)
t.Run("run analyzer with default params", func(t *testing.T) {
resp, err := p.RunAnalyzer(context.Background(), &milvuspb.RunAnalyzerRequest{
Placeholder: [][]byte{[]byte("test doc")},
})
require.NoError(t, err)
require.NoError(t, merr.Error(resp.GetStatus()))
assert.Equal(t, len(resp.GetResults()[0].GetTokens()), 2)
})
// run analyzer with invalid params
resp, err = p.RunAnalyzer(context.Background(), &milvuspb.RunAnalyzerRequest{
t.Run("run analyzer with invalid params", func(t *testing.T) {
resp, err := p.RunAnalyzer(context.Background(), &milvuspb.RunAnalyzerRequest{
Placeholder: [][]byte{[]byte("test doc")},
AnalyzerParams: "invalid json",
})
require.NoError(t, err)
require.Error(t, merr.Error(resp.GetStatus()))
})
t.Run("run analyzer from loaded collection field", func(t *testing.T) {
mockCache := NewMockCache(t)
globalMetaCache = mockCache
fieldMap := &typeutil.ConcurrentMap[string, int64]{}
fieldMap.Insert("test_text", 100)
mockCache.EXPECT().GetCollectionID(mock.Anything, "default", "test_collection").Return(1, nil)
mockCache.EXPECT().GetCollectionSchema(mock.Anything, "default", "test_collection").Return(&schemaInfo{
CollectionSchema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{{
FieldID: 100,
Name: "test_text",
}},
},
fieldMap: fieldMap,
}, nil)
lb := NewMockLBPolicy(t)
lb.EXPECT().ExecuteOneChannel(mock.Anything, mock.Anything).Return(nil)
p.lbPolicy = lb
resp, err := p.RunAnalyzer(context.Background(), &milvuspb.RunAnalyzerRequest{
Placeholder: [][]byte{[]byte("test doc")},
CollectionName: "test_collection",
FieldName: "test_text",
})
require.NoError(t, err)
require.NoError(t, merr.Error(resp.GetStatus()))
})
}
func Test_GetSegmentsInfo(t *testing.T) {

View File

@ -17,6 +17,7 @@ package proxy
import (
"context"
"fmt"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
@ -55,6 +56,7 @@ type CollectionWorkLoad struct {
type LBPolicy interface {
Execute(ctx context.Context, workload CollectionWorkLoad) error
ExecuteOneChannel(ctx context.Context, workload CollectionWorkLoad) error
ExecuteWithRetry(ctx context.Context, workload ChannelWorkload) error
UpdateCostMetrics(node int64, cost *internalpb.CostAggregation)
Start(ctx context.Context)
@ -259,6 +261,36 @@ func (lb *LBPolicyImpl) Execute(ctx context.Context, workload CollectionWorkLoad
return wg.Wait()
}
// Execute will execute any one channel in collection workload
func (lb *LBPolicyImpl) ExecuteOneChannel(ctx context.Context, workload CollectionWorkLoad) error {
dml2leaders, err := lb.GetShardLeaders(ctx, workload.db, workload.collectionName, workload.collectionID, true)
if err != nil {
log.Ctx(ctx).Warn("failed to get shards", zap.Error(err))
return err
}
// let every request could retry at least twice, which could retry after update shard leader cache
for k, v := range dml2leaders {
channel := k
nodes := v
channelRetryTimes := lb.retryOnReplica
if len(nodes) > 0 {
channelRetryTimes *= len(nodes)
}
return lb.ExecuteWithRetry(ctx, ChannelWorkload{
db: workload.db,
collectionName: workload.collectionName,
collectionID: workload.collectionID,
channel: channel,
shardLeaders: nodes,
nq: workload.nq,
exec: workload.exec,
retryTimes: uint(channelRetryTimes),
})
}
return fmt.Errorf("no acitvate sheard leader exist for collection: %s", workload.collectionName)
}
func (lb *LBPolicyImpl) UpdateCostMetrics(node int64, cost *internalpb.CostAggregation) {
lb.getBalancer().UpdateCostMetrics(node, cost)
}

View File

@ -378,6 +378,41 @@ func (s *LBPolicySuite) TestExecuteWithRetry() {
s.True(merr.IsCanceledOrTimeout(err))
}
func (s *LBPolicySuite) TestExecuteOneChannel() {
ctx := context.Background()
mockErr := errors.New("mock error")
// test all channel success
s.mgr.EXPECT().GetClient(mock.Anything, mock.Anything).Return(s.qn, nil)
s.lbBalancer.EXPECT().RegisterNodeInfo(mock.Anything)
s.lbBalancer.EXPECT().SelectNode(mock.Anything, mock.Anything, mock.Anything).Return(1, nil)
s.lbBalancer.EXPECT().CancelWorkload(mock.Anything, mock.Anything)
err := s.lbPolicy.ExecuteOneChannel(ctx, CollectionWorkLoad{
db: dbName,
collectionName: s.collectionName,
collectionID: s.collectionID,
nq: 1,
exec: func(ctx context.Context, ui UniqueID, qn types.QueryNodeClient, channel string) error {
return nil
},
})
s.NoError(err)
// test get shard leader failed
s.qc.ExpectedCalls = nil
globalMetaCache.DeprecateShardCache(dbName, s.collectionName)
s.qc.EXPECT().GetShardLeaders(mock.Anything, mock.Anything).Return(nil, mockErr)
err = s.lbPolicy.ExecuteOneChannel(ctx, CollectionWorkLoad{
db: dbName,
collectionName: s.collectionName,
collectionID: s.collectionID,
nq: 1,
exec: func(ctx context.Context, ui UniqueID, qn types.QueryNodeClient, channel string) error {
return nil
},
})
s.ErrorIs(err, mockErr)
}
func (s *LBPolicySuite) TestExecute() {
ctx := context.Background()
mockErr := errors.New("mock error")

View File

@ -101,6 +101,53 @@ func (_c *MockLBPolicy_Execute_Call) RunAndReturn(run func(context.Context, Coll
return _c
}
// ExecuteOneChannel provides a mock function with given fields: ctx, workload
func (_m *MockLBPolicy) ExecuteOneChannel(ctx context.Context, workload CollectionWorkLoad) error {
ret := _m.Called(ctx, workload)
if len(ret) == 0 {
panic("no return value specified for ExecuteOneChannel")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, CollectionWorkLoad) error); ok {
r0 = rf(ctx, workload)
} else {
r0 = ret.Error(0)
}
return r0
}
// MockLBPolicy_ExecuteOneChannel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ExecuteOneChannel'
type MockLBPolicy_ExecuteOneChannel_Call struct {
*mock.Call
}
// ExecuteOneChannel is a helper method to define mock.On call
// - ctx context.Context
// - workload CollectionWorkLoad
func (_e *MockLBPolicy_Expecter) ExecuteOneChannel(ctx interface{}, workload interface{}) *MockLBPolicy_ExecuteOneChannel_Call {
return &MockLBPolicy_ExecuteOneChannel_Call{Call: _e.mock.On("ExecuteOneChannel", ctx, workload)}
}
func (_c *MockLBPolicy_ExecuteOneChannel_Call) Run(run func(ctx context.Context, workload CollectionWorkLoad)) *MockLBPolicy_ExecuteOneChannel_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(CollectionWorkLoad))
})
return _c
}
func (_c *MockLBPolicy_ExecuteOneChannel_Call) Return(_a0 error) *MockLBPolicy_ExecuteOneChannel_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockLBPolicy_ExecuteOneChannel_Call) RunAndReturn(run func(context.Context, CollectionWorkLoad) error) *MockLBPolicy_ExecuteOneChannel_Call {
_c.Call.Return(run)
return _c
}
// ExecuteWithRetry provides a mock function with given fields: ctx, workload
func (_m *MockLBPolicy) ExecuteWithRetry(ctx context.Context, workload ChannelWorkload) error {
ret := _m.Called(ctx, workload)

View File

@ -104,6 +104,7 @@ const (
TransferReplicaTaskName = "TransferReplicaTask"
ListResourceGroupsTaskName = "ListResourceGroupsTask"
DescribeResourceGroupTaskName = "DescribeResourceGroupTask"
RunAnalyzerTaskName = "RunAnalyzer"
CreateDatabaseTaskName = "CreateCollectionTask"
DropDatabaseTaskName = "DropDatabaseTaskName"
@ -2802,6 +2803,125 @@ func (t *ListResourceGroupsTask) PostExecute(ctx context.Context) error {
return nil
}
type RunAnalyzerTask struct {
baseTask
Condition
*milvuspb.RunAnalyzerRequest
ctx context.Context
collectionID typeutil.UniqueID
fieldID typeutil.UniqueID
dbName string
lb LBPolicy
result *milvuspb.RunAnalyzerResponse
}
func (t *RunAnalyzerTask) TraceCtx() context.Context {
return t.ctx
}
func (t *RunAnalyzerTask) ID() UniqueID {
return t.Base.MsgID
}
func (t *RunAnalyzerTask) SetID(uid UniqueID) {
t.Base.MsgID = uid
}
func (t *RunAnalyzerTask) Name() string {
return RunAnalyzerTaskName
}
func (t *RunAnalyzerTask) Type() commonpb.MsgType {
return t.Base.MsgType
}
func (t *RunAnalyzerTask) BeginTs() Timestamp {
return t.Base.Timestamp
}
func (t *RunAnalyzerTask) EndTs() Timestamp {
return t.Base.Timestamp
}
func (t *RunAnalyzerTask) SetTs(ts Timestamp) {
t.Base.Timestamp = ts
}
func (t *RunAnalyzerTask) OnEnqueue() error {
if t.Base == nil {
t.Base = commonpbutil.NewMsgBase()
}
t.Base.MsgType = commonpb.MsgType_RunAnalyzer
t.Base.SourceID = paramtable.GetNodeID()
return nil
}
func (t *RunAnalyzerTask) PreExecute(ctx context.Context) error {
dbName := t.GetDbName()
if dbName == "" {
dbName = "default"
}
t.dbName = dbName
collID, err := globalMetaCache.GetCollectionID(ctx, dbName, t.GetCollectionName())
if err != nil { // err is not nil if collection not exists
return merr.WrapErrAsInputErrorWhen(err, merr.ErrCollectionNotFound, merr.ErrDatabaseNotFound)
}
t.collectionID = collID
schema, err := globalMetaCache.GetCollectionSchema(ctx, dbName, t.GetCollectionName())
if err != nil { // err is not nil if collection not exists
return merr.WrapErrAsInputErrorWhen(err, merr.ErrCollectionNotFound, merr.ErrDatabaseNotFound)
}
fieldId, ok := schema.MapFieldID(t.GetFieldName())
if !ok {
return merr.WrapErrAsInputError(merr.WrapErrFieldNotFound(t.GetFieldName()))
}
t.fieldID = fieldId
t.result = &milvuspb.RunAnalyzerResponse{}
return nil
}
func (t *RunAnalyzerTask) runAnalyzerOnShardleader(ctx context.Context, nodeID int64, qn types.QueryNodeClient, channel string) error {
resp, err := qn.RunAnalyzer(ctx, &querypb.RunAnalyzerRequest{
Channel: channel,
FieldId: t.fieldID,
AnalyzerNames: t.GetAnalyzerNames(),
Placeholder: t.GetPlaceholder(),
WithDetail: t.GetWithDetail(),
WithHash: t.GetWithHash(),
})
if err != nil {
return err
}
if err := merr.Error(resp.GetStatus()); err != nil {
return err
}
t.result = resp
return nil
}
func (t *RunAnalyzerTask) Execute(ctx context.Context) error {
err := t.lb.ExecuteOneChannel(ctx, CollectionWorkLoad{
db: t.dbName,
collectionName: t.GetCollectionName(),
collectionID: t.collectionID,
nq: int64(len(t.GetPlaceholder())),
exec: t.runAnalyzerOnShardleader,
})
return err
}
func (t *RunAnalyzerTask) PostExecute(ctx context.Context) error {
return nil
}
// isIgnoreGrowing is used to check if the request should ignore growing
func isIgnoreGrowing(params []*commonpb.KeyValuePair) (bool, error) {
for _, kv := range params {

View File

@ -99,6 +99,26 @@ func isNumber(c uint8) bool {
return true
}
// check run analyzer params when collection name was set
func validateRunAnalyzer(req *milvuspb.RunAnalyzerRequest) error {
if req.GetAnalyzerParams() != "" {
return fmt.Errorf("run analyzer can't use analyzer params and (collection,field) in same time")
}
if req.GetFieldName() == "" {
return fmt.Errorf("must set field name when collection name was set")
}
if req.GetAnalyzerNames() != nil {
if len(req.GetAnalyzerNames()) != 1 && len(req.GetAnalyzerNames()) != len(req.GetPlaceholder()) {
return fmt.Errorf("only support set one analyzer name for all text or set analyzer name for each text, but now analzer name num: %d, text num: %d",
len(req.GetAnalyzerNames()), len(req.GetPlaceholder()))
}
}
return nil
}
func validateMaxQueryResultWindow(offset int64, limit int64) error {
if offset < 0 {
return fmt.Errorf("%s [%d] is invalid, should be gte than 0", OffsetKey, offset)

View File

@ -28,7 +28,7 @@ packages:
github.com/milvus-io/milvus/internal/querycoordv2/task:
interfaces:
Scheduler:
github.com/milvus-io/milvus/pkg/proto/querypb:
github.com/milvus-io/milvus/pkg/v2/proto/querypb:
interfaces:
QueryNodeServer:
config:

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.46.0. DO NOT EDIT.
// Code generated by mockery v2.53.3. DO NOT EDIT.
package mocks
@ -1067,6 +1067,65 @@ func (_c *MockQueryNodeServer_ReleaseSegments_Call) RunAndReturn(run func(contex
return _c
}
// RunAnalyzer provides a mock function with given fields: _a0, _a1
func (_m *MockQueryNodeServer) RunAnalyzer(_a0 context.Context, _a1 *querypb.RunAnalyzerRequest) (*milvuspb.RunAnalyzerResponse, error) {
ret := _m.Called(_a0, _a1)
if len(ret) == 0 {
panic("no return value specified for RunAnalyzer")
}
var r0 *milvuspb.RunAnalyzerResponse
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *querypb.RunAnalyzerRequest) (*milvuspb.RunAnalyzerResponse, error)); ok {
return rf(_a0, _a1)
}
if rf, ok := ret.Get(0).(func(context.Context, *querypb.RunAnalyzerRequest) *milvuspb.RunAnalyzerResponse); ok {
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*milvuspb.RunAnalyzerResponse)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *querypb.RunAnalyzerRequest) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockQueryNodeServer_RunAnalyzer_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RunAnalyzer'
type MockQueryNodeServer_RunAnalyzer_Call struct {
*mock.Call
}
// RunAnalyzer is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 *querypb.RunAnalyzerRequest
func (_e *MockQueryNodeServer_Expecter) RunAnalyzer(_a0 interface{}, _a1 interface{}) *MockQueryNodeServer_RunAnalyzer_Call {
return &MockQueryNodeServer_RunAnalyzer_Call{Call: _e.mock.On("RunAnalyzer", _a0, _a1)}
}
func (_c *MockQueryNodeServer_RunAnalyzer_Call) Run(run func(_a0 context.Context, _a1 *querypb.RunAnalyzerRequest)) *MockQueryNodeServer_RunAnalyzer_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*querypb.RunAnalyzerRequest))
})
return _c
}
func (_c *MockQueryNodeServer_RunAnalyzer_Call) Return(_a0 *milvuspb.RunAnalyzerResponse, _a1 error) *MockQueryNodeServer_RunAnalyzer_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockQueryNodeServer_RunAnalyzer_Call) RunAndReturn(run func(context.Context, *querypb.RunAnalyzerRequest) (*milvuspb.RunAnalyzerResponse, error)) *MockQueryNodeServer_RunAnalyzer_Call {
_c.Call.Return(run)
return _c
}
// Search provides a mock function with given fields: _a0, _a1
func (_m *MockQueryNodeServer) Search(_a0 context.Context, _a1 *querypb.SearchRequest) (*internalpb.SearchResults, error) {
ret := _m.Called(_a0, _a1)

View File

@ -33,6 +33,7 @@ import (
"google.golang.org/protobuf/proto"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/querynodev2/cluster"
@ -94,6 +95,9 @@ type ShardDelegator interface {
UpdateTSafe(ts uint64)
GetTSafe() uint64
// analyzer
RunAnalyzer(ctx context.Context, req *querypb.RunAnalyzerRequest) ([]*milvuspb.AnalyzerResult, error)
// control
Serviceable() bool
Start()
@ -142,10 +146,13 @@ type shardDelegator struct {
growingSegmentLock sync.RWMutex
partitionStatsMut sync.RWMutex
// fieldId -> functionRunner map for search function field
// outputFieldId -> functionRunner map for search function field
functionRunners map[UniqueID]function.FunctionRunner
isBM25Field map[UniqueID]bool
// analyzerFieldID -> analyzerRunner map for run analyzer.
analyzerRunners map[UniqueID]function.Analyzer
// current forward policy
l0ForwardPolicy string
}
@ -914,6 +921,7 @@ func NewShardDelegator(ctx context.Context, collectionID UniqueID, replicaID Uni
partitionStats: make(map[UniqueID]*storage.PartitionStatsSnapshot),
excludedSegments: excludedSegments,
functionRunners: make(map[int64]function.FunctionRunner),
analyzerRunners: make(map[UniqueID]function.Analyzer),
isBM25Field: make(map[int64]bool),
l0ForwardPolicy: policy,
}
@ -925,6 +933,8 @@ func NewShardDelegator(ctx context.Context, collectionID UniqueID, replicaID Uni
return nil, err
}
sd.functionRunners[tf.OutputFieldIds[0]] = functionRunner
// bm25 input field could use same runner between function and analyzer.
sd.analyzerRunners[tf.InputFieldIds[0]] = functionRunner.(function.Analyzer)
if tf.GetType() == schemapb.FunctionType_BM25 {
sd.isBM25Field[tf.OutputFieldIds[0]] = true
}
@ -941,3 +951,43 @@ func NewShardDelegator(ctx context.Context, collectionID UniqueID, replicaID Uni
log.Info("finish build new shardDelegator")
return sd, nil
}
func (sd *shardDelegator) RunAnalyzer(ctx context.Context, req *querypb.RunAnalyzerRequest) ([]*milvuspb.AnalyzerResult, error) {
analyzer, ok := sd.analyzerRunners[req.GetFieldId()]
if !ok {
return nil, fmt.Errorf("analyzer runner for field %d not exist, now only support run analyzer by field if field was bm25 input field", req.GetFieldId())
}
var result [][]*milvuspb.AnalyzerToken
texts := lo.Map(req.GetPlaceholder(), func(bytes []byte, _ int) string {
return string(bytes)
})
var err error
if len(analyzer.GetInputFields()) == 1 {
result, err = analyzer.BatchAnalyze(req.WithDetail, req.WithHash, texts)
} else {
analyzerNames := req.GetAnalyzerNames()
if len(analyzerNames) == 0 {
return nil, merr.WrapErrAsInputError(fmt.Errorf("analyzer names must be set for multi analyzer"))
}
if len(analyzerNames) == 1 && len(texts) > 1 {
analyzerNames = make([]string, len(texts))
for i := range analyzerNames {
analyzerNames[i] = req.AnalyzerNames[0]
}
}
result, err = analyzer.BatchAnalyze(req.WithDetail, req.WithHash, texts, analyzerNames)
}
if err != nil {
return nil, err
}
return lo.Map(result, func(tokens []*milvuspb.AnalyzerToken, _ int) *milvuspb.AnalyzerResult {
return &milvuspb.AnalyzerResult{
Tokens: tokens,
}
}), nil
}

View File

@ -1189,6 +1189,147 @@ func (s *DelegatorSuite) TestGetStats() {
})
}
func (s *DelegatorSuite) ResetDelegator() {
var err error
s.delegator.Close()
s.delegator, err = NewShardDelegator(context.Background(), s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, s.manager, s.loader, &msgstream.MockMqFactory{
NewMsgStreamFunc: func(_ context.Context) (msgstream.MsgStream, error) {
return s.mq, nil
},
}, 10000, nil, s.chunkManager)
s.Require().NoError(err)
}
func (s *DelegatorSuite) TestRunAnalyzer() {
ctx := context.Background()
s.Run("field analyzer not exist", func() {
_, err := s.delegator.RunAnalyzer(ctx, &querypb.RunAnalyzerRequest{
FieldId: 100,
})
s.Require().Error(err)
})
s.Run("normal analyer", func() {
s.manager.Collection.PutOrRef(s.collectionID, &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: 100,
Name: "text",
DataType: schemapb.DataType_VarChar,
TypeParams: []*commonpb.KeyValuePair{{Key: "analyzer_params", Value: "{}"}},
},
{
FieldID: 101,
Name: "sparse",
DataType: schemapb.DataType_SparseFloatVector,
},
},
Functions: []*schemapb.FunctionSchema{{
Type: schemapb.FunctionType_BM25,
InputFieldNames: []string{"text"},
InputFieldIds: []int64{100},
OutputFieldNames: []string{"sparse"},
OutputFieldIds: []int64{101},
}},
}, nil, nil)
s.ResetDelegator()
result, err := s.delegator.RunAnalyzer(ctx, &querypb.RunAnalyzerRequest{
FieldId: 100,
Placeholder: [][]byte{[]byte("test doc")},
})
s.Require().NoError(err)
s.Equal(2, len(result[0].GetTokens()))
})
s.Run("multi analyzer", func() {
s.manager.Collection.PutOrRef(s.collectionID, &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: 100,
Name: "text",
DataType: schemapb.DataType_VarChar,
TypeParams: []*commonpb.KeyValuePair{{Key: "multi_analyzer_params", Value: `{
"by_field": "analyzer",
"analyzers": {
"default": {}
}
}`}},
},
{
FieldID: 101,
Name: "sparse",
DataType: schemapb.DataType_SparseFloatVector,
},
{
FieldID: 102,
Name: "analyzer",
DataType: schemapb.DataType_VarChar,
},
},
Functions: []*schemapb.FunctionSchema{{
Type: schemapb.FunctionType_BM25,
InputFieldNames: []string{"text"},
InputFieldIds: []int64{100},
OutputFieldNames: []string{"sparse"},
OutputFieldIds: []int64{101},
}},
}, nil, nil)
s.ResetDelegator()
result, err := s.delegator.RunAnalyzer(ctx, &querypb.RunAnalyzerRequest{
FieldId: 100,
Placeholder: [][]byte{[]byte("test doc"), []byte("test doc2")},
AnalyzerNames: []string{"default"},
})
s.Require().NoError(err)
s.Equal(2, len(result[0].GetTokens()))
s.Equal(2, len(result[1].GetTokens()))
})
s.Run("error multi analyzer but no analyzer name", func() {
s.manager.Collection.PutOrRef(s.collectionID, &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: 100,
Name: "text",
DataType: schemapb.DataType_VarChar,
TypeParams: []*commonpb.KeyValuePair{{Key: "multi_analyzer_params", Value: `{
"by_field": "analyzer",
"analyzers": {
"default": {}
}
}`}},
},
{
FieldID: 101,
Name: "sparse",
DataType: schemapb.DataType_SparseFloatVector,
},
{
FieldID: 102,
Name: "analyzer",
DataType: schemapb.DataType_VarChar,
},
},
Functions: []*schemapb.FunctionSchema{{
Type: schemapb.FunctionType_BM25,
InputFieldNames: []string{"text"},
InputFieldIds: []int64{100},
OutputFieldNames: []string{"sparse"},
OutputFieldIds: []int64{101},
}},
}, nil, nil)
s.ResetDelegator()
_, err := s.delegator.RunAnalyzer(ctx, &querypb.RunAnalyzerRequest{
FieldId: 100,
Placeholder: [][]byte{[]byte("test doc")},
})
s.Require().Error(err)
})
}
func TestDelegatorSuite(t *testing.T) {
suite.Run(t, new(DelegatorSuite))
}

View File

@ -5,7 +5,9 @@ package delegator
import (
context "context"
milvuspb "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
internalpb "github.com/milvus-io/milvus/pkg/v2/proto/internalpb"
mock "github.com/stretchr/testify/mock"
msgpb "github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
@ -767,6 +769,65 @@ func (_c *MockShardDelegator_ReleaseSegments_Call) RunAndReturn(run func(context
return _c
}
// RunAnalyzer provides a mock function with given fields: ctx, req
func (_m *MockShardDelegator) RunAnalyzer(ctx context.Context, req *querypb.RunAnalyzerRequest) ([]*milvuspb.AnalyzerResult, error) {
ret := _m.Called(ctx, req)
if len(ret) == 0 {
panic("no return value specified for RunAnalyzer")
}
var r0 []*milvuspb.AnalyzerResult
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *querypb.RunAnalyzerRequest) ([]*milvuspb.AnalyzerResult, error)); ok {
return rf(ctx, req)
}
if rf, ok := ret.Get(0).(func(context.Context, *querypb.RunAnalyzerRequest) []*milvuspb.AnalyzerResult); ok {
r0 = rf(ctx, req)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*milvuspb.AnalyzerResult)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *querypb.RunAnalyzerRequest) error); ok {
r1 = rf(ctx, req)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockShardDelegator_RunAnalyzer_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RunAnalyzer'
type MockShardDelegator_RunAnalyzer_Call struct {
*mock.Call
}
// RunAnalyzer is a helper method to define mock.On call
// - ctx context.Context
// - req *querypb.RunAnalyzerRequest
func (_e *MockShardDelegator_Expecter) RunAnalyzer(ctx interface{}, req interface{}) *MockShardDelegator_RunAnalyzer_Call {
return &MockShardDelegator_RunAnalyzer_Call{Call: _e.mock.On("RunAnalyzer", ctx, req)}
}
func (_c *MockShardDelegator_RunAnalyzer_Call) Run(run func(ctx context.Context, req *querypb.RunAnalyzerRequest)) *MockShardDelegator_RunAnalyzer_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*querypb.RunAnalyzerRequest))
})
return _c
}
func (_c *MockShardDelegator_RunAnalyzer_Call) Return(_a0 []*milvuspb.AnalyzerResult, _a1 error) *MockShardDelegator_RunAnalyzer_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockShardDelegator_RunAnalyzer_Call) RunAndReturn(run func(context.Context, *querypb.RunAnalyzerRequest) ([]*milvuspb.AnalyzerResult, error)) *MockShardDelegator_RunAnalyzer_Call {
_c.Call.Return(run)
return _c
}
// Search provides a mock function with given fields: ctx, req
func (_m *MockShardDelegator) Search(ctx context.Context, req *querypb.SearchRequest) ([]*internalpb.SearchResults, error) {
ret := _m.Called(ctx, req)

View File

@ -1498,6 +1498,31 @@ func (node *QueryNode) DeleteBatch(ctx context.Context, req *querypb.DeleteBatch
}, nil
}
func (node *QueryNode) RunAnalyzer(ctx context.Context, req *querypb.RunAnalyzerRequest) (*milvuspb.RunAnalyzerResponse, error) {
// get delegator
sd, ok := node.delegators.Get(req.GetChannel())
if !ok {
err := merr.WrapErrChannelNotFound(req.GetChannel())
log.Warn("RunAnalyzer failed, failed to get shard delegator", zap.Error(err))
return &milvuspb.RunAnalyzerResponse{
Status: merr.Status(err),
}, nil
}
// run analyzer
results, err := sd.RunAnalyzer(ctx, req)
if err != nil {
log.Warn("failed to search on delegator", zap.Error(err))
return &milvuspb.RunAnalyzerResponse{
Status: merr.Status(err),
}, nil
}
return &milvuspb.RunAnalyzerResponse{
Status: merr.Status(nil),
Results: results,
}, nil
}
type deleteRequestStringer struct {
*querypb.DeleteRequest
}

View File

@ -17,6 +17,7 @@ package querynodev2
import (
"context"
"fmt"
"io"
"math/rand"
"path"
@ -2348,6 +2349,55 @@ func (suite *ServiceSuite) TestLoadPartition() {
suite.Equal(commonpb.ErrorCode_Success, status.ErrorCode)
}
func (suite *ServiceSuite) TestRunAnalyzer() {
ctx := context.Background()
suite.Run("delegator not exist", func() {
resp, err := suite.node.RunAnalyzer(ctx, &querypb.RunAnalyzerRequest{
Channel: suite.vchannel,
FieldId: 100,
Placeholder: [][]byte{[]byte("test doc")},
})
suite.Require().NoError(err)
suite.Require().Error(merr.Error(resp.GetStatus()))
})
suite.Run("normal run", func() {
delegator := &delegator.MockShardDelegator{}
suite.node.delegators.Insert(suite.vchannel, delegator)
defer suite.node.delegators.GetAndRemove(suite.vchannel)
delegator.EXPECT().RunAnalyzer(mock.Anything, mock.Anything).Return(
[]*milvuspb.AnalyzerResult{}, nil)
_, err := suite.node.RunAnalyzer(ctx, &querypb.RunAnalyzerRequest{
Channel: suite.vchannel,
FieldId: 100,
Placeholder: [][]byte{[]byte("test doc")},
})
suite.Require().NoError(err)
})
suite.Run("run analyzer failed", func() {
delegator := &delegator.MockShardDelegator{}
suite.node.delegators.Insert(suite.vchannel, delegator)
defer suite.node.delegators.GetAndRemove(suite.vchannel)
delegator.EXPECT().RunAnalyzer(mock.Anything, mock.Anything).Return(
nil, fmt.Errorf("mock error"))
resp, err := suite.node.RunAnalyzer(ctx, &querypb.RunAnalyzerRequest{
Channel: suite.vchannel,
FieldId: 100,
Placeholder: [][]byte{[]byte("test doc")},
})
suite.Require().NoError(err)
suite.Require().Error(merr.Error(resp.GetStatus()))
})
}
func TestQueryNodeService(t *testing.T) {
suite.Run(t, new(ServiceSuite))
}

View File

@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/util/ctokenizer"
"github.com/milvus-io/milvus/internal/util/tokenizerapi"
@ -33,6 +34,11 @@ import (
const analyzerParams = "analyzer_params"
type Analyzer interface {
BatchAnalyze(withDetail bool, withHash bool, inputs ...any) ([][]*milvuspb.AnalyzerToken, error)
GetInputFields() []*schemapb.FieldSchema
}
// BM25 Runner
// Input: string
// Output: map[uint32]float32
@ -159,6 +165,80 @@ func (v *BM25FunctionRunner) BatchRun(inputs ...any) ([]any, error) {
return []any{buildSparseFloatArray(embedData)}, nil
}
func (v *BM25FunctionRunner) analyze(data []string, dst [][]*milvuspb.AnalyzerToken, withDetail bool, withHash bool) error {
tokenizer, err := v.tokenizer.Clone()
if err != nil {
return err
}
defer tokenizer.Destroy()
for i := 0; i < len(data); i++ {
result := []*milvuspb.AnalyzerToken{}
tokenStream := tokenizer.NewTokenStream(data[i])
defer tokenStream.Destroy()
for tokenStream.Advance() {
var token *milvuspb.AnalyzerToken
if withDetail {
token = tokenStream.DetailedToken()
} else {
token = &milvuspb.AnalyzerToken{
Token: tokenStream.Token(),
}
}
if withHash {
token.Hash = typeutil.HashString2LessUint32(token.GetToken())
}
result = append(result, token)
}
dst[i] = result
}
return nil
}
func (v *BM25FunctionRunner) BatchAnalyze(withDetail bool, withHash bool, inputs ...any) ([][]*milvuspb.AnalyzerToken, error) {
if len(inputs) > 1 {
return nil, errors.New("analyze received should only receive text input column(not set analyzer name)")
}
text, ok := inputs[0].([]string)
if !ok {
return nil, errors.New("batch input not string list")
}
rowNum := len(text)
result := make([][]*milvuspb.AnalyzerToken, rowNum)
wg := sync.WaitGroup{}
errCh := make(chan error, v.concurrency)
for i, j := 0, 0; i < v.concurrency && j < rowNum; i++ {
start := j
end := start + rowNum/v.concurrency
if i < rowNum%v.concurrency {
end += 1
}
wg.Add(1)
go func() {
defer wg.Done()
err := v.analyze(text[start:end], result[start:end], withDetail, withHash)
if err != nil {
errCh <- err
return
}
}()
j = end
}
wg.Wait()
close(errCh)
for err := range errCh {
if err != nil {
return nil, err
}
}
return result, nil
}
func (v *BM25FunctionRunner) GetSchema() *schemapb.FunctionSchema {
return v.schema
}

View File

@ -23,6 +23,7 @@ import (
"fmt"
"sync"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/util/ctokenizer"
"github.com/milvus-io/milvus/internal/util/tokenizerapi"
@ -224,6 +225,95 @@ func (v *MultiAnalyzerBM25FunctionRunner) BatchRun(inputs ...any) ([]any, error)
return []any{buildSparseFloatArray(embedData)}, nil
}
func (v *MultiAnalyzerBM25FunctionRunner) analyze(data []string, analyzerName []string, dst [][]*milvuspb.AnalyzerToken, withDetail bool, withHash bool) error {
cloneAnalyzers := map[string]tokenizerapi.Tokenizer{}
defer func() {
for _, analyzer := range cloneAnalyzers {
analyzer.Destroy()
}
}()
for i := 0; i < len(data); i++ {
result := []*milvuspb.AnalyzerToken{}
analyzer, err := v.getAnalyzer(analyzerName[i], cloneAnalyzers)
if err != nil {
return err
}
tokenStream := analyzer.NewTokenStream(data[i])
defer tokenStream.Destroy()
for tokenStream.Advance() {
var token *milvuspb.AnalyzerToken
if withDetail {
token = tokenStream.DetailedToken()
} else {
token = &milvuspb.AnalyzerToken{
Token: tokenStream.Token(),
}
}
if withHash {
token.Hash = typeutil.HashString2LessUint32(token.GetToken())
}
result = append(result, token)
}
dst[i] = result
}
return nil
}
func (v *MultiAnalyzerBM25FunctionRunner) BatchAnalyze(withDetail bool, withHash bool, inputs ...any) ([][]*milvuspb.AnalyzerToken, error) {
if len(inputs) != 2 {
return nil, fmt.Errorf("multi analyzer must received two input column(text, analyzer_name)")
}
text, ok := inputs[0].([]string)
if !ok {
return nil, fmt.Errorf("multi analyzer text input must be string list")
}
analyzer, ok := inputs[1].([]string)
if !ok {
return nil, fmt.Errorf("multi analyzer input analyzer name must be string list")
}
if len(text) != len(analyzer) {
return nil, fmt.Errorf("multi analyzer input text and analyzer name must have same length")
}
rowNum := len(text)
result := make([][]*milvuspb.AnalyzerToken, rowNum)
wg := sync.WaitGroup{}
errCh := make(chan error, v.concurrency)
for i, j := 0, 0; i < v.concurrency && j < rowNum; i++ {
start := j
end := start + rowNum/v.concurrency
if i < rowNum%v.concurrency {
end += 1
}
wg.Add(1)
go func() {
defer wg.Done()
err := v.analyze(text[start:end], analyzer[start:end], result[start:end], withDetail, withHash)
if err != nil {
errCh <- err
return
}
}()
j = end
}
wg.Wait()
close(errCh)
for err := range errCh {
if err != nil {
return nil, err
}
}
return result, nil
}
func (v *MultiAnalyzerBM25FunctionRunner) GetSchema() *schemapb.FunctionSchema {
return v.schema
}

View File

@ -134,6 +134,10 @@ func (m *GrpcQueryNodeClient) DeleteBatch(ctx context.Context, in *querypb.Delet
return &querypb.DeleteBatchResponse{}, m.Err
}
func (m *GrpcQueryNodeClient) RunAnalyzer(ctx context.Context, in *querypb.RunAnalyzerRequest, opts ...grpc.CallOption) (*milvuspb.RunAnalyzerResponse, error) {
return &milvuspb.RunAnalyzerResponse{}, m.Err
}
func (m *GrpcQueryNodeClient) Close() error {
return m.Err
}

View File

@ -152,6 +152,10 @@ func (qn *qnServerWrapper) DeleteBatch(ctx context.Context, in *querypb.DeleteBa
return qn.QueryNode.DeleteBatch(ctx, in)
}
func (qn *qnServerWrapper) RunAnalyzer(ctx context.Context, in *querypb.RunAnalyzerRequest, _ ...grpc.CallOption) (*milvuspb.RunAnalyzerResponse, error) {
return qn.QueryNode.RunAnalyzer(ctx, in)
}
func WrapQueryNodeServerAsClient(qn types.QueryNode) types.QueryNodeClient {
return &qnServerWrapper{
QueryNode: qn,

View File

@ -14,7 +14,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/json-iterator/go v1.1.12
github.com/klauspost/compress v1.17.7
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.13-0.20250520065018-13f9a20ffaad
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.13-0.20250527033022-8bab5b3dea49
github.com/nats-io/nats-server/v2 v2.10.12
github.com/nats-io/nats.go v1.34.1
github.com/panjf2000/ants/v2 v2.11.3

View File

@ -490,6 +490,8 @@ github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZz
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.13-0.20250520065018-13f9a20ffaad h1:tkSvshW0g1nbf7gPIsku838FFvrlucnPtAXYBiGuTAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.13-0.20250520065018-13f9a20ffaad/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.13-0.20250527033022-8bab5b3dea49 h1:ERaE86WBbccuyqDpdb+JUFniZ7mSdhw6FK7m18PX+t8=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.13-0.20250527033022-8bab5b3dea49/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE=
github.com/milvus-io/pulsar-client-go v0.12.1/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=

View File

@ -177,6 +177,7 @@ service QueryNode {
// it's basically same as `Delete` but cost less memory pressure.
rpc DeleteBatch(DeleteBatchRequest) returns (DeleteBatchResponse) {
}
rpc RunAnalyzer(RunAnalyzerRequest) returns(milvus.RunAnalyzerResponse){}
}
// --------------------QueryCoord grpc request and response proto------------------
@ -945,3 +946,14 @@ message UpdateLoadConfigRequest {
int32 replica_number = 4;
repeated string resource_groups = 5;
}
message RunAnalyzerRequest{
common.MsgBase base = 1;
string channel = 2;
int64 field_id = 3;
repeated string analyzer_names = 4;
repeated bytes placeholder =5;
bool with_detail = 6;
bool with_hash = 7;
}

File diff suppressed because it is too large Load Diff

View File

@ -1547,6 +1547,7 @@ const (
QueryNode_SyncDistribution_FullMethodName = "/milvus.proto.query.QueryNode/SyncDistribution"
QueryNode_Delete_FullMethodName = "/milvus.proto.query.QueryNode/Delete"
QueryNode_DeleteBatch_FullMethodName = "/milvus.proto.query.QueryNode/DeleteBatch"
QueryNode_RunAnalyzer_FullMethodName = "/milvus.proto.query.QueryNode/RunAnalyzer"
)
// QueryNodeClient is the client API for QueryNode service.
@ -1581,6 +1582,7 @@ type QueryNodeClient interface {
// DeleteBatch is the API to apply same delete data into multiple segments.
// it's basically same as `Delete` but cost less memory pressure.
DeleteBatch(ctx context.Context, in *DeleteBatchRequest, opts ...grpc.CallOption) (*DeleteBatchResponse, error)
RunAnalyzer(ctx context.Context, in *RunAnalyzerRequest, opts ...grpc.CallOption) (*milvuspb.RunAnalyzerResponse, error)
}
type queryNodeClient struct {
@ -1862,6 +1864,15 @@ func (c *queryNodeClient) DeleteBatch(ctx context.Context, in *DeleteBatchReques
return out, nil
}
func (c *queryNodeClient) RunAnalyzer(ctx context.Context, in *RunAnalyzerRequest, opts ...grpc.CallOption) (*milvuspb.RunAnalyzerResponse, error) {
out := new(milvuspb.RunAnalyzerResponse)
err := c.cc.Invoke(ctx, QueryNode_RunAnalyzer_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// QueryNodeServer is the server API for QueryNode service.
// All implementations should embed UnimplementedQueryNodeServer
// for forward compatibility
@ -1894,6 +1905,7 @@ type QueryNodeServer interface {
// DeleteBatch is the API to apply same delete data into multiple segments.
// it's basically same as `Delete` but cost less memory pressure.
DeleteBatch(context.Context, *DeleteBatchRequest) (*DeleteBatchResponse, error)
RunAnalyzer(context.Context, *RunAnalyzerRequest) (*milvuspb.RunAnalyzerResponse, error)
}
// UnimplementedQueryNodeServer should be embedded to have forward compatible implementations.
@ -1975,6 +1987,9 @@ func (UnimplementedQueryNodeServer) Delete(context.Context, *DeleteRequest) (*co
func (UnimplementedQueryNodeServer) DeleteBatch(context.Context, *DeleteBatchRequest) (*DeleteBatchResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method DeleteBatch not implemented")
}
func (UnimplementedQueryNodeServer) RunAnalyzer(context.Context, *RunAnalyzerRequest) (*milvuspb.RunAnalyzerResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method RunAnalyzer not implemented")
}
// UnsafeQueryNodeServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to QueryNodeServer will
@ -2443,6 +2458,24 @@ func _QueryNode_DeleteBatch_Handler(srv interface{}, ctx context.Context, dec fu
return interceptor(ctx, in, info, handler)
}
func _QueryNode_RunAnalyzer_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(RunAnalyzerRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(QueryNodeServer).RunAnalyzer(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: QueryNode_RunAnalyzer_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(QueryNodeServer).RunAnalyzer(ctx, req.(*RunAnalyzerRequest))
}
return interceptor(ctx, in, info, handler)
}
// QueryNode_ServiceDesc is the grpc.ServiceDesc for QueryNode service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
@ -2542,6 +2575,10 @@ var QueryNode_ServiceDesc = grpc.ServiceDesc{
MethodName: "DeleteBatch",
Handler: _QueryNode_DeleteBatch_Handler,
},
{
MethodName: "RunAnalyzer",
Handler: _QueryNode_RunAnalyzer_Handler,
},
},
Streams: []grpc.StreamDesc{
{