From 08a53c56b1ff0098c033cf15746f1f0dcc1488ba Mon Sep 17 00:00:00 2001 From: congqixia Date: Wed, 28 May 2025 11:30:28 +0800 Subject: [PATCH] fix: [AddField] Use metacache schema in embedding node (#42115) Related to #42084 Embedding node cached schema when created, causing schema mismatch after schema change. This PR make embeddingNode use schema from metacache, which will be updated. --------- Signed-off-by: Congqi Xia --- internal/flushcommon/pipeline/data_sync_service.go | 2 +- .../pipeline/flow_graph_embedding_node.go | 11 +++++++---- .../pipeline/flow_graph_embedding_node_test.go | 12 ++++++++---- 3 files changed, 16 insertions(+), 9 deletions(-) diff --git a/internal/flushcommon/pipeline/data_sync_service.go b/internal/flushcommon/pipeline/data_sync_service.go index ecd498634f..3636cdf5f0 100644 --- a/internal/flushcommon/pipeline/data_sync_service.go +++ b/internal/flushcommon/pipeline/data_sync_service.go @@ -288,7 +288,7 @@ func getServiceWithChannel(initCtx context.Context, params *util.PipelineParams, nodeList = append(nodeList, ddNode) if len(info.GetSchema().GetFunctions()) > 0 { - emNode, err := newEmbeddingNode(channelName, info.GetSchema()) + emNode, err := newEmbeddingNode(channelName, config.metacache) if err != nil { return nil, err } diff --git a/internal/flushcommon/pipeline/flow_graph_embedding_node.go b/internal/flushcommon/pipeline/flow_graph_embedding_node.go index 5292683dc9..ca3cb5ff2c 100644 --- a/internal/flushcommon/pipeline/flow_graph_embedding_node.go +++ b/internal/flushcommon/pipeline/flow_graph_embedding_node.go @@ -24,6 +24,7 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/flushcommon/metacache" "github.com/milvus-io/milvus/internal/flushcommon/writebuffer" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/function" @@ -37,7 +38,7 @@ import ( type embeddingNode struct { BaseNode - schema *schemapb.CollectionSchema + metaCache metacache.MetaCache pkField *schemapb.FieldSchema channelName string @@ -45,7 +46,7 @@ type embeddingNode struct { functionRunners map[int64]function.FunctionRunner } -func newEmbeddingNode(channelName string, schema *schemapb.CollectionSchema) (*embeddingNode, error) { +func newEmbeddingNode(channelName string, metaCache metacache.MetaCache) (*embeddingNode, error) { baseNode := BaseNode{} baseNode.SetMaxQueueLength(paramtable.Get().DataNodeCfg.FlowGraphMaxQueueLength.GetAsInt32()) baseNode.SetMaxParallelism(paramtable.Get().DataNodeCfg.FlowGraphMaxParallelism.GetAsInt32()) @@ -53,10 +54,12 @@ func newEmbeddingNode(channelName string, schema *schemapb.CollectionSchema) (*e node := &embeddingNode{ BaseNode: baseNode, channelName: channelName, - schema: schema, + metaCache: metaCache, functionRunners: make(map[int64]function.FunctionRunner), } + schema := metaCache.Schema() + for _, field := range schema.GetFields() { if field.GetIsPrimaryKey() { node.pkField = field @@ -152,7 +155,7 @@ func (eNode *embeddingNode) Operate(in []Msg) []Msg { return []Msg{fgMsg} } - insertData, err := writebuffer.PrepareInsert(eNode.schema, eNode.pkField, fgMsg.InsertMessages) + insertData, err := writebuffer.PrepareInsert(eNode.metaCache.Schema(), eNode.pkField, fgMsg.InsertMessages) if err != nil { log.Error("failed to prepare insert data", zap.Error(err)) panic(err) diff --git a/internal/flushcommon/pipeline/flow_graph_embedding_node_test.go b/internal/flushcommon/pipeline/flow_graph_embedding_node_test.go index 709534e2b1..73c420f074 100644 --- a/internal/flushcommon/pipeline/flow_graph_embedding_node_test.go +++ b/internal/flushcommon/pipeline/flow_graph_embedding_node_test.go @@ -24,6 +24,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/flushcommon/metacache" "github.com/milvus-io/milvus/internal/util/flowgraph" "github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/mq/msgstream" @@ -66,8 +67,11 @@ func TestEmbeddingNode_BM25_Operator(t *testing.T) { }}, } + metaCache := metacache.NewMockMetaCache(t) + metaCache.EXPECT().Schema().Return(collSchema) + t.Run("normal case", func(t *testing.T) { - node, err := newEmbeddingNode("test-channel", collSchema) + node, err := newEmbeddingNode("test-channel", metaCache) assert.NoError(t, err) var output []Msg @@ -108,7 +112,7 @@ func TestEmbeddingNode_BM25_Operator(t *testing.T) { }) t.Run("with close msg", func(t *testing.T) { - node, err := newEmbeddingNode("test-channel", collSchema) + node, err := newEmbeddingNode("test-channel", metaCache) assert.NoError(t, err) var output []Msg @@ -125,7 +129,7 @@ func TestEmbeddingNode_BM25_Operator(t *testing.T) { }) t.Run("prepare insert failed", func(t *testing.T) { - node, err := newEmbeddingNode("test-channel", collSchema) + node, err := newEmbeddingNode("test-channel", metaCache) assert.NoError(t, err) assert.Panics(t, func() { @@ -146,7 +150,7 @@ func TestEmbeddingNode_BM25_Operator(t *testing.T) { }) t.Run("embedding failed", func(t *testing.T) { - node, err := newEmbeddingNode("test-channel", collSchema) + node, err := newEmbeddingNode("test-channel", metaCache) assert.NoError(t, err) node.functionRunners[0].GetSchema().Type = 0