diff --git a/internal/flushcommon/pipeline/flow_graph_embedding_node.go b/internal/flushcommon/pipeline/flow_graph_embedding_node.go index 2a9a476af9..ce29a10822 100644 --- a/internal/flushcommon/pipeline/flow_graph_embedding_node.go +++ b/internal/flushcommon/pipeline/flow_graph_embedding_node.go @@ -155,14 +155,16 @@ func (eNode *embeddingNode) Operate(in []Msg) []Msg { return []Msg{fgMsg} } - insertData, err := writebuffer.PrepareInsert(eNode.metaCache.GetSchema(fgMsg.TimeTick()), eNode.pkField, fgMsg.InsertMessages) - if err != nil { - log.Error("failed to prepare insert data", zap.Error(err)) - panic(err) + insertData := make([]*writebuffer.InsertData, 0) + if len(fgMsg.InsertMessages) > 0 { + var err error + if insertData, err = writebuffer.PrepareInsert(eNode.metaCache.GetSchema(fgMsg.TimeTick()), eNode.pkField, fgMsg.InsertMessages); err != nil { + log.Error("failed to prepare insert data", zap.Error(err)) + panic(err) + } } - err = eNode.Embedding(insertData) - if err != nil { + if err := eNode.Embedding(insertData); err != nil { log.Warn("failed to embedding insert data", zap.Error(err)) panic(err) } diff --git a/internal/flushcommon/pipeline/flow_graph_write_node.go b/internal/flushcommon/pipeline/flow_graph_write_node.go index 9988711797..6e5c17b07d 100644 --- a/internal/flushcommon/pipeline/flow_graph_write_node.go +++ b/internal/flushcommon/pipeline/flow_graph_write_node.go @@ -82,10 +82,13 @@ func (wNode *writeNode) Operate(in []Msg) []Msg { start, end := fgMsg.StartPositions[0], fgMsg.EndPositions[0] if fgMsg.InsertData == nil { - insertData, err := writebuffer.PrepareInsert(wNode.metacache.GetSchema(fgMsg.TimeTick()), wNode.pkField, fgMsg.InsertMessages) - if err != nil { - log.Error("failed to prepare data", zap.Error(err)) - panic(err) + insertData := make([]*writebuffer.InsertData, 0) + if len(fgMsg.InsertMessages) > 0 { + var err error + if insertData, err = writebuffer.PrepareInsert(wNode.metacache.GetSchema(fgMsg.TimeTick()), wNode.pkField, fgMsg.InsertMessages); err != nil { + log.Error("failed to prepare data", zap.Error(err)) + panic(err) + } } fgMsg.InsertData = insertData }