From bb562c6a7ea51304bd5d8ed4ae59bdd72034c58e Mon Sep 17 00:00:00 2001 From: aoiasd <45024769+aoiasd@users.noreply.github.com> Date: Thu, 15 May 2025 15:48:23 +0800 Subject: [PATCH] fix:[2.5] analyzer memory leak because function runner not close (#41840) relate: https://github.com/milvus-io/milvus/issues/41213 pr:https://github.com/milvus-io/milvus/pull/41839 Signed-off-by: aoiasd --- internal/datanode/importv2/util.go | 1 + .../pipeline/flow_graph_embedding_node.go | 6 +++ .../querynodev2/pipeline/embedding_node.go | 6 +++ internal/util/function/bm25_function.go | 4 ++ internal/util/function/function.go | 2 + internal/util/function/mock_function.go | 40 +++++++++++++++++-- .../function/multi_analyzer_bm25_function.go | 6 +++ internal/util/pipeline/node.go | 5 +++ internal/util/pipeline/pipeline.go | 1 + 9 files changed, 67 insertions(+), 4 deletions(-) diff --git a/internal/datanode/importv2/util.go b/internal/datanode/importv2/util.go index fa73c1f212..57aa16fe79 100644 --- a/internal/datanode/importv2/util.go +++ b/internal/datanode/importv2/util.go @@ -214,6 +214,7 @@ func RunEmbeddingFunction(task *ImportTask, data *storage.InsertData) error { if err != nil { return err } + defer runner.Close() inputFieldIDs := lo.Map(runner.GetInputFields(), func(field *schemapb.FieldSchema, _ int) int64 { return field.GetFieldID() }) inputDatas := make([]any, 0, len(inputFieldIDs)) diff --git a/internal/flushcommon/pipeline/flow_graph_embedding_node.go b/internal/flushcommon/pipeline/flow_graph_embedding_node.go index e63434cfb7..8c26be6020 100644 --- a/internal/flushcommon/pipeline/flow_graph_embedding_node.go +++ b/internal/flushcommon/pipeline/flow_graph_embedding_node.go @@ -165,6 +165,12 @@ func (eNode *embeddingNode) Operate(in []Msg) []Msg { return []Msg{fgMsg} } +func (eNode *embeddingNode) Close() { + for _, runner := range eNode.functionRunners { + runner.Close() + } +} + func BuildSparseFieldData(array *schemapb.SparseFloatArray) storage.FieldData { return &storage.SparseFloatVectorFieldData{ SparseFloatArray: schemapb.SparseFloatArray{ diff --git a/internal/querynodev2/pipeline/embedding_node.go b/internal/querynodev2/pipeline/embedding_node.go index 8b18024c0e..a2e10a6d24 100644 --- a/internal/querynodev2/pipeline/embedding_node.go +++ b/internal/querynodev2/pipeline/embedding_node.go @@ -203,6 +203,12 @@ func (eNode *embeddingNode) Operate(in Msg) Msg { return nodeMsg } +func (eNode *embeddingNode) Close() { + for _, functionRunner := range eNode.functionRunners { + functionRunner.Close() + } +} + func getEmbeddingFieldDatas(datas []*schemapb.FieldData, fieldIDs ...int64) ([]any, error) { result := []any{} for _, fieldID := range fieldIDs { diff --git a/internal/util/function/bm25_function.go b/internal/util/function/bm25_function.go index 43e1aa2795..703c8d62d5 100644 --- a/internal/util/function/bm25_function.go +++ b/internal/util/function/bm25_function.go @@ -171,6 +171,10 @@ func (v *BM25FunctionRunner) GetInputFields() []*schemapb.FieldSchema { return []*schemapb.FieldSchema{v.inputField} } +func (v *BM25FunctionRunner) Close() { + v.tokenizer.Destroy() +} + func buildSparseFloatArray(mapdata []map[uint32]float32) *schemapb.SparseFloatArray { dim := int64(0) bytes := lo.Map(mapdata, func(sparseMap map[uint32]float32, _ int) []byte { diff --git a/internal/util/function/function.go b/internal/util/function/function.go index a945a489a2..2042c19749 100644 --- a/internal/util/function/function.go +++ b/internal/util/function/function.go @@ -30,6 +30,8 @@ type FunctionRunner interface { GetSchema() *schemapb.FunctionSchema GetOutputFields() []*schemapb.FieldSchema GetInputFields() []*schemapb.FieldSchema + + Close() } func NewFunctionRunner(coll *schemapb.CollectionSchema, schema *schemapb.FunctionSchema) (FunctionRunner, error) { diff --git a/internal/util/function/mock_function.go b/internal/util/function/mock_function.go index 5a21bdc3da..244169aa19 100644 --- a/internal/util/function/mock_function.go +++ b/internal/util/function/mock_function.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.46.0. DO NOT EDIT. +// Code generated by mockery v2.53.3. DO NOT EDIT. package function @@ -87,7 +87,39 @@ func (_c *MockFunctionRunner_BatchRun_Call) RunAndReturn(run func(...interface{} return _c } -// GetInputFields provides a mock function with given fields: +// Close provides a mock function with no fields +func (_m *MockFunctionRunner) Close() { + _m.Called() +} + +// MockFunctionRunner_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close' +type MockFunctionRunner_Close_Call struct { + *mock.Call +} + +// Close is a helper method to define mock.On call +func (_e *MockFunctionRunner_Expecter) Close() *MockFunctionRunner_Close_Call { + return &MockFunctionRunner_Close_Call{Call: _e.mock.On("Close")} +} + +func (_c *MockFunctionRunner_Close_Call) Run(run func()) *MockFunctionRunner_Close_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockFunctionRunner_Close_Call) Return() *MockFunctionRunner_Close_Call { + _c.Call.Return() + return _c +} + +func (_c *MockFunctionRunner_Close_Call) RunAndReturn(run func()) *MockFunctionRunner_Close_Call { + _c.Run(run) + return _c +} + +// GetInputFields provides a mock function with no fields func (_m *MockFunctionRunner) GetInputFields() []*schemapb.FieldSchema { ret := _m.Called() @@ -134,7 +166,7 @@ func (_c *MockFunctionRunner_GetInputFields_Call) RunAndReturn(run func() []*sch return _c } -// GetOutputFields provides a mock function with given fields: +// GetOutputFields provides a mock function with no fields func (_m *MockFunctionRunner) GetOutputFields() []*schemapb.FieldSchema { ret := _m.Called() @@ -181,7 +213,7 @@ func (_c *MockFunctionRunner_GetOutputFields_Call) RunAndReturn(run func() []*sc return _c } -// GetSchema provides a mock function with given fields: +// GetSchema provides a mock function with no fields func (_m *MockFunctionRunner) GetSchema() *schemapb.FunctionSchema { ret := _m.Called() diff --git a/internal/util/function/multi_analyzer_bm25_function.go b/internal/util/function/multi_analyzer_bm25_function.go index 15887e4bf1..1b12b2602d 100644 --- a/internal/util/function/multi_analyzer_bm25_function.go +++ b/internal/util/function/multi_analyzer_bm25_function.go @@ -235,3 +235,9 @@ func (v *MultiAnalyzerBM25FunctionRunner) GetOutputFields() []*schemapb.FieldSch func (v *MultiAnalyzerBM25FunctionRunner) GetInputFields() []*schemapb.FieldSchema { return v.inputFields } + +func (v *MultiAnalyzerBM25FunctionRunner) Close() { + for _, analyzer := range v.analyzers { + analyzer.Destroy() + } +} diff --git a/internal/util/pipeline/node.go b/internal/util/pipeline/node.go index 77a294b9aa..5037d6507b 100644 --- a/internal/util/pipeline/node.go +++ b/internal/util/pipeline/node.go @@ -24,6 +24,8 @@ type Node interface { Name() string MaxQueueLength() int32 Operate(in Msg) Msg + + Close() } type nodeCtx struct { @@ -56,6 +58,9 @@ func (node *BaseNode) MaxQueueLength() int32 { return node.maxQueueLength } +func (node *BaseNode) Close() { +} + func NewBaseNode(name string, maxQueryLength int32) *BaseNode { return &BaseNode{ name: name, diff --git a/internal/util/pipeline/pipeline.go b/internal/util/pipeline/pipeline.go index 3ec1cff148..d439ab923c 100644 --- a/internal/util/pipeline/pipeline.go +++ b/internal/util/pipeline/pipeline.go @@ -70,6 +70,7 @@ func (p *pipeline) Start() error { func (p *pipeline) Close() { for _, node := range p.nodes { + node.node.Close() if node.Checker != nil { node.Checker.Close() }