mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
fix:[2.5] analyzer memory leak because function runner not close (#41840)
relate: https://github.com/milvus-io/milvus/issues/41213 pr:https://github.com/milvus-io/milvus/pull/41839 Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
This commit is contained in:
parent
e36df6991b
commit
bb562c6a7e
@ -214,6 +214,7 @@ func RunEmbeddingFunction(task *ImportTask, data *storage.InsertData) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer runner.Close()
|
||||
|
||||
inputFieldIDs := lo.Map(runner.GetInputFields(), func(field *schemapb.FieldSchema, _ int) int64 { return field.GetFieldID() })
|
||||
inputDatas := make([]any, 0, len(inputFieldIDs))
|
||||
|
||||
@ -165,6 +165,12 @@ func (eNode *embeddingNode) Operate(in []Msg) []Msg {
|
||||
return []Msg{fgMsg}
|
||||
}
|
||||
|
||||
func (eNode *embeddingNode) Close() {
|
||||
for _, runner := range eNode.functionRunners {
|
||||
runner.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func BuildSparseFieldData(array *schemapb.SparseFloatArray) storage.FieldData {
|
||||
return &storage.SparseFloatVectorFieldData{
|
||||
SparseFloatArray: schemapb.SparseFloatArray{
|
||||
|
||||
@ -203,6 +203,12 @@ func (eNode *embeddingNode) Operate(in Msg) Msg {
|
||||
return nodeMsg
|
||||
}
|
||||
|
||||
func (eNode *embeddingNode) Close() {
|
||||
for _, functionRunner := range eNode.functionRunners {
|
||||
functionRunner.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func getEmbeddingFieldDatas(datas []*schemapb.FieldData, fieldIDs ...int64) ([]any, error) {
|
||||
result := []any{}
|
||||
for _, fieldID := range fieldIDs {
|
||||
|
||||
@ -171,6 +171,10 @@ func (v *BM25FunctionRunner) GetInputFields() []*schemapb.FieldSchema {
|
||||
return []*schemapb.FieldSchema{v.inputField}
|
||||
}
|
||||
|
||||
func (v *BM25FunctionRunner) Close() {
|
||||
v.tokenizer.Destroy()
|
||||
}
|
||||
|
||||
func buildSparseFloatArray(mapdata []map[uint32]float32) *schemapb.SparseFloatArray {
|
||||
dim := int64(0)
|
||||
bytes := lo.Map(mapdata, func(sparseMap map[uint32]float32, _ int) []byte {
|
||||
|
||||
@ -30,6 +30,8 @@ type FunctionRunner interface {
|
||||
GetSchema() *schemapb.FunctionSchema
|
||||
GetOutputFields() []*schemapb.FieldSchema
|
||||
GetInputFields() []*schemapb.FieldSchema
|
||||
|
||||
Close()
|
||||
}
|
||||
|
||||
func NewFunctionRunner(coll *schemapb.CollectionSchema, schema *schemapb.FunctionSchema) (FunctionRunner, error) {
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
// Code generated by mockery v2.46.0. DO NOT EDIT.
|
||||
// Code generated by mockery v2.53.3. DO NOT EDIT.
|
||||
|
||||
package function
|
||||
|
||||
@ -87,7 +87,39 @@ func (_c *MockFunctionRunner_BatchRun_Call) RunAndReturn(run func(...interface{}
|
||||
return _c
|
||||
}
|
||||
|
||||
// GetInputFields provides a mock function with given fields:
|
||||
// Close provides a mock function with no fields
|
||||
func (_m *MockFunctionRunner) Close() {
|
||||
_m.Called()
|
||||
}
|
||||
|
||||
// MockFunctionRunner_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close'
|
||||
type MockFunctionRunner_Close_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// Close is a helper method to define mock.On call
|
||||
func (_e *MockFunctionRunner_Expecter) Close() *MockFunctionRunner_Close_Call {
|
||||
return &MockFunctionRunner_Close_Call{Call: _e.mock.On("Close")}
|
||||
}
|
||||
|
||||
func (_c *MockFunctionRunner_Close_Call) Run(run func()) *MockFunctionRunner_Close_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run()
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockFunctionRunner_Close_Call) Return() *MockFunctionRunner_Close_Call {
|
||||
_c.Call.Return()
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockFunctionRunner_Close_Call) RunAndReturn(run func()) *MockFunctionRunner_Close_Call {
|
||||
_c.Run(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// GetInputFields provides a mock function with no fields
|
||||
func (_m *MockFunctionRunner) GetInputFields() []*schemapb.FieldSchema {
|
||||
ret := _m.Called()
|
||||
|
||||
@ -134,7 +166,7 @@ func (_c *MockFunctionRunner_GetInputFields_Call) RunAndReturn(run func() []*sch
|
||||
return _c
|
||||
}
|
||||
|
||||
// GetOutputFields provides a mock function with given fields:
|
||||
// GetOutputFields provides a mock function with no fields
|
||||
func (_m *MockFunctionRunner) GetOutputFields() []*schemapb.FieldSchema {
|
||||
ret := _m.Called()
|
||||
|
||||
@ -181,7 +213,7 @@ func (_c *MockFunctionRunner_GetOutputFields_Call) RunAndReturn(run func() []*sc
|
||||
return _c
|
||||
}
|
||||
|
||||
// GetSchema provides a mock function with given fields:
|
||||
// GetSchema provides a mock function with no fields
|
||||
func (_m *MockFunctionRunner) GetSchema() *schemapb.FunctionSchema {
|
||||
ret := _m.Called()
|
||||
|
||||
|
||||
@ -235,3 +235,9 @@ func (v *MultiAnalyzerBM25FunctionRunner) GetOutputFields() []*schemapb.FieldSch
|
||||
func (v *MultiAnalyzerBM25FunctionRunner) GetInputFields() []*schemapb.FieldSchema {
|
||||
return v.inputFields
|
||||
}
|
||||
|
||||
func (v *MultiAnalyzerBM25FunctionRunner) Close() {
|
||||
for _, analyzer := range v.analyzers {
|
||||
analyzer.Destroy()
|
||||
}
|
||||
}
|
||||
|
||||
@ -24,6 +24,8 @@ type Node interface {
|
||||
Name() string
|
||||
MaxQueueLength() int32
|
||||
Operate(in Msg) Msg
|
||||
|
||||
Close()
|
||||
}
|
||||
|
||||
type nodeCtx struct {
|
||||
@ -56,6 +58,9 @@ func (node *BaseNode) MaxQueueLength() int32 {
|
||||
return node.maxQueueLength
|
||||
}
|
||||
|
||||
func (node *BaseNode) Close() {
|
||||
}
|
||||
|
||||
func NewBaseNode(name string, maxQueryLength int32) *BaseNode {
|
||||
return &BaseNode{
|
||||
name: name,
|
||||
|
||||
@ -70,6 +70,7 @@ func (p *pipeline) Start() error {
|
||||
|
||||
func (p *pipeline) Close() {
|
||||
for _, node := range p.nodes {
|
||||
node.node.Close()
|
||||
if node.Checker != nil {
|
||||
node.Checker.Close()
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user