fix: flow graph should free function resource after all node close (#42731)

relate: https://github.com/milvus-io/milvus/issues/42730

Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
This commit is contained in:
aoiasd 2025-06-13 22:14:37 +08:00 committed by GitHub
parent 9acba25fad
commit 201e980d3d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 16 additions and 7 deletions

View File

@ -171,7 +171,7 @@ func (eNode *embeddingNode) Operate(in []Msg) []Msg {
return []Msg{fgMsg}
}
func (eNode *embeddingNode) Close() {
func (eNode *embeddingNode) Free() {
for _, runner := range eNode.functionRunners {
runner.Close()
}

View File

@ -73,7 +73,7 @@ func TestEmbeddingNode_BM25_Operator(t *testing.T) {
t.Run("normal case", func(t *testing.T) {
node, err := newEmbeddingNode("test-channel", metaCache)
assert.NoError(t, err)
defer node.Close()
defer node.Free()
var output []Msg
assert.NotPanics(t, func() {
@ -115,7 +115,7 @@ func TestEmbeddingNode_BM25_Operator(t *testing.T) {
t.Run("with close msg", func(t *testing.T) {
node, err := newEmbeddingNode("test-channel", metaCache)
assert.NoError(t, err)
defer node.Close()
defer node.Free()
var output []Msg
@ -133,7 +133,7 @@ func TestEmbeddingNode_BM25_Operator(t *testing.T) {
t.Run("prepare insert failed", func(t *testing.T) {
node, err := newEmbeddingNode("test-channel", metaCache)
assert.NoError(t, err)
defer node.Close()
defer node.Free()
assert.Panics(t, func() {
node.Operate([]Msg{
@ -155,7 +155,7 @@ func TestEmbeddingNode_BM25_Operator(t *testing.T) {
t.Run("embedding failed", func(t *testing.T) {
node, err := newEmbeddingNode("test-channel", metaCache)
assert.NoError(t, err)
defer node.Close()
defer node.Free()
node.functionRunners[0].GetSchema().Type = 0
assert.Panics(t, func() {

View File

@ -34,8 +34,7 @@ type Pipeline interface {
type pipeline struct {
base.StreamPipeline
collectionID UniqueID
embeddingNode embeddingNode
collectionID UniqueID
}
func (p *pipeline) GetCollectionID() UniqueID {

View File

@ -121,6 +121,12 @@ func (fg *TimeTickedFlowGraph) Close() {
}
}
fg.closeWg.Wait()
// free some source after all node close.
// such as function.
for _, v := range fg.nodeCtx {
v.node.Free()
}
})
}

View File

@ -47,6 +47,7 @@ type Node interface {
IsInputNode() bool
Start()
Close()
Free()
}
// BaseNode defines some common node attributes and behavior
@ -239,6 +240,9 @@ func (node *BaseNode) Start() {}
// Close implementing Node, base node does nothing when stops
func (node *BaseNode) Close() {}
// Free resource after all node close
func (node *BaseNode) Free() {}
func (node *BaseNode) Name() string {
return "BaseNode"
}