From 201e980d3d078815f0f033991d75703d86fa36a0 Mon Sep 17 00:00:00 2001 From: aoiasd <45024769+aoiasd@users.noreply.github.com> Date: Fri, 13 Jun 2025 22:14:37 +0800 Subject: [PATCH] fix: flow graph should free function resource after all node close (#42731) relate: https://github.com/milvus-io/milvus/issues/42730 Signed-off-by: aoiasd --- .../flushcommon/pipeline/flow_graph_embedding_node.go | 2 +- .../pipeline/flow_graph_embedding_node_test.go | 8 ++++---- internal/querynodev2/pipeline/pipeline.go | 3 +-- internal/util/flowgraph/flow_graph.go | 6 ++++++ internal/util/flowgraph/node.go | 4 ++++ 5 files changed, 16 insertions(+), 7 deletions(-) diff --git a/internal/flushcommon/pipeline/flow_graph_embedding_node.go b/internal/flushcommon/pipeline/flow_graph_embedding_node.go index 2d78af8d31..542ba9b149 100644 --- a/internal/flushcommon/pipeline/flow_graph_embedding_node.go +++ b/internal/flushcommon/pipeline/flow_graph_embedding_node.go @@ -171,7 +171,7 @@ func (eNode *embeddingNode) Operate(in []Msg) []Msg { return []Msg{fgMsg} } -func (eNode *embeddingNode) Close() { +func (eNode *embeddingNode) Free() { for _, runner := range eNode.functionRunners { runner.Close() } diff --git a/internal/flushcommon/pipeline/flow_graph_embedding_node_test.go b/internal/flushcommon/pipeline/flow_graph_embedding_node_test.go index 883df0cbae..a4cd9cefbc 100644 --- a/internal/flushcommon/pipeline/flow_graph_embedding_node_test.go +++ b/internal/flushcommon/pipeline/flow_graph_embedding_node_test.go @@ -73,7 +73,7 @@ func TestEmbeddingNode_BM25_Operator(t *testing.T) { t.Run("normal case", func(t *testing.T) { node, err := newEmbeddingNode("test-channel", metaCache) assert.NoError(t, err) - defer node.Close() + defer node.Free() var output []Msg assert.NotPanics(t, func() { @@ -115,7 +115,7 @@ func TestEmbeddingNode_BM25_Operator(t *testing.T) { t.Run("with close msg", func(t *testing.T) { node, err := newEmbeddingNode("test-channel", metaCache) assert.NoError(t, err) - defer node.Close() + defer node.Free() var output []Msg @@ -133,7 +133,7 @@ func TestEmbeddingNode_BM25_Operator(t *testing.T) { t.Run("prepare insert failed", func(t *testing.T) { node, err := newEmbeddingNode("test-channel", metaCache) assert.NoError(t, err) - defer node.Close() + defer node.Free() assert.Panics(t, func() { node.Operate([]Msg{ @@ -155,7 +155,7 @@ func TestEmbeddingNode_BM25_Operator(t *testing.T) { t.Run("embedding failed", func(t *testing.T) { node, err := newEmbeddingNode("test-channel", metaCache) assert.NoError(t, err) - defer node.Close() + defer node.Free() node.functionRunners[0].GetSchema().Type = 0 assert.Panics(t, func() { diff --git a/internal/querynodev2/pipeline/pipeline.go b/internal/querynodev2/pipeline/pipeline.go index 408d4241ce..936fcee6f4 100644 --- a/internal/querynodev2/pipeline/pipeline.go +++ b/internal/querynodev2/pipeline/pipeline.go @@ -34,8 +34,7 @@ type Pipeline interface { type pipeline struct { base.StreamPipeline - collectionID UniqueID - embeddingNode embeddingNode + collectionID UniqueID } func (p *pipeline) GetCollectionID() UniqueID { diff --git a/internal/util/flowgraph/flow_graph.go b/internal/util/flowgraph/flow_graph.go index 1de19af539..6e7f4f87cf 100644 --- a/internal/util/flowgraph/flow_graph.go +++ b/internal/util/flowgraph/flow_graph.go @@ -121,6 +121,12 @@ func (fg *TimeTickedFlowGraph) Close() { } } fg.closeWg.Wait() + + // free some source after all node close. + // such as function. + for _, v := range fg.nodeCtx { + v.node.Free() + } }) } diff --git a/internal/util/flowgraph/node.go b/internal/util/flowgraph/node.go index b1d4b4a7b5..3ad60da154 100644 --- a/internal/util/flowgraph/node.go +++ b/internal/util/flowgraph/node.go @@ -47,6 +47,7 @@ type Node interface { IsInputNode() bool Start() Close() + Free() } // BaseNode defines some common node attributes and behavior @@ -239,6 +240,9 @@ func (node *BaseNode) Start() {} // Close implementing Node, base node does nothing when stops func (node *BaseNode) Close() {} +// Free resource after all node close +func (node *BaseNode) Free() {} + func (node *BaseNode) Name() string { return "BaseNode" }