fix: [AddField] Use metacache schema in embedding node (#42115)

Related to #42084

Embedding node cached schema when created, causing schema mismatch after
schema change. This PR make embeddingNode use schema from metacache,
which will be updated.

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2025-05-28 11:30:28 +08:00 committed by GitHub
parent da30e1e4df
commit 08a53c56b1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 16 additions and 9 deletions

View File

@ -288,7 +288,7 @@ func getServiceWithChannel(initCtx context.Context, params *util.PipelineParams,
nodeList = append(nodeList, ddNode)
if len(info.GetSchema().GetFunctions()) > 0 {
emNode, err := newEmbeddingNode(channelName, info.GetSchema())
emNode, err := newEmbeddingNode(channelName, config.metacache)
if err != nil {
return nil, err
}

View File

@ -24,6 +24,7 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/flushcommon/metacache"
"github.com/milvus-io/milvus/internal/flushcommon/writebuffer"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/function"
@ -37,7 +38,7 @@ import (
type embeddingNode struct {
BaseNode
schema *schemapb.CollectionSchema
metaCache metacache.MetaCache
pkField *schemapb.FieldSchema
channelName string
@ -45,7 +46,7 @@ type embeddingNode struct {
functionRunners map[int64]function.FunctionRunner
}
func newEmbeddingNode(channelName string, schema *schemapb.CollectionSchema) (*embeddingNode, error) {
func newEmbeddingNode(channelName string, metaCache metacache.MetaCache) (*embeddingNode, error) {
baseNode := BaseNode{}
baseNode.SetMaxQueueLength(paramtable.Get().DataNodeCfg.FlowGraphMaxQueueLength.GetAsInt32())
baseNode.SetMaxParallelism(paramtable.Get().DataNodeCfg.FlowGraphMaxParallelism.GetAsInt32())
@ -53,10 +54,12 @@ func newEmbeddingNode(channelName string, schema *schemapb.CollectionSchema) (*e
node := &embeddingNode{
BaseNode: baseNode,
channelName: channelName,
schema: schema,
metaCache: metaCache,
functionRunners: make(map[int64]function.FunctionRunner),
}
schema := metaCache.Schema()
for _, field := range schema.GetFields() {
if field.GetIsPrimaryKey() {
node.pkField = field
@ -152,7 +155,7 @@ func (eNode *embeddingNode) Operate(in []Msg) []Msg {
return []Msg{fgMsg}
}
insertData, err := writebuffer.PrepareInsert(eNode.schema, eNode.pkField, fgMsg.InsertMessages)
insertData, err := writebuffer.PrepareInsert(eNode.metaCache.Schema(), eNode.pkField, fgMsg.InsertMessages)
if err != nil {
log.Error("failed to prepare insert data", zap.Error(err))
panic(err)

View File

@ -24,6 +24,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/flushcommon/metacache"
"github.com/milvus-io/milvus/internal/util/flowgraph"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/mq/msgstream"
@ -66,8 +67,11 @@ func TestEmbeddingNode_BM25_Operator(t *testing.T) {
}},
}
metaCache := metacache.NewMockMetaCache(t)
metaCache.EXPECT().Schema().Return(collSchema)
t.Run("normal case", func(t *testing.T) {
node, err := newEmbeddingNode("test-channel", collSchema)
node, err := newEmbeddingNode("test-channel", metaCache)
assert.NoError(t, err)
var output []Msg
@ -108,7 +112,7 @@ func TestEmbeddingNode_BM25_Operator(t *testing.T) {
})
t.Run("with close msg", func(t *testing.T) {
node, err := newEmbeddingNode("test-channel", collSchema)
node, err := newEmbeddingNode("test-channel", metaCache)
assert.NoError(t, err)
var output []Msg
@ -125,7 +129,7 @@ func TestEmbeddingNode_BM25_Operator(t *testing.T) {
})
t.Run("prepare insert failed", func(t *testing.T) {
node, err := newEmbeddingNode("test-channel", collSchema)
node, err := newEmbeddingNode("test-channel", metaCache)
assert.NoError(t, err)
assert.Panics(t, func() {
@ -146,7 +150,7 @@ func TestEmbeddingNode_BM25_Operator(t *testing.T) {
})
t.Run("embedding failed", func(t *testing.T) {
node, err := newEmbeddingNode("test-channel", collSchema)
node, err := newEmbeddingNode("test-channel", metaCache)
assert.NoError(t, err)
node.functionRunners[0].GetSchema().Type = 0