fix: get schema panics when recover from channel checkpoint (#43605)

issue: #43597, #43598

Signed-off-by: chyezh <chyezh@outlook.com>
This commit is contained in:
Zhen Ye 2025-07-28 16:42:56 +08:00 committed by GitHub
parent 864d1b93b1
commit 5b9b895cb0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 15 additions and 10 deletions

View File

@ -155,14 +155,16 @@ func (eNode *embeddingNode) Operate(in []Msg) []Msg {
return []Msg{fgMsg} return []Msg{fgMsg}
} }
insertData, err := writebuffer.PrepareInsert(eNode.metaCache.GetSchema(fgMsg.TimeTick()), eNode.pkField, fgMsg.InsertMessages) insertData := make([]*writebuffer.InsertData, 0)
if err != nil { if len(fgMsg.InsertMessages) > 0 {
log.Error("failed to prepare insert data", zap.Error(err)) var err error
panic(err) if insertData, err = writebuffer.PrepareInsert(eNode.metaCache.GetSchema(fgMsg.TimeTick()), eNode.pkField, fgMsg.InsertMessages); err != nil {
log.Error("failed to prepare insert data", zap.Error(err))
panic(err)
}
} }
err = eNode.Embedding(insertData) if err := eNode.Embedding(insertData); err != nil {
if err != nil {
log.Warn("failed to embedding insert data", zap.Error(err)) log.Warn("failed to embedding insert data", zap.Error(err))
panic(err) panic(err)
} }

View File

@ -82,10 +82,13 @@ func (wNode *writeNode) Operate(in []Msg) []Msg {
start, end := fgMsg.StartPositions[0], fgMsg.EndPositions[0] start, end := fgMsg.StartPositions[0], fgMsg.EndPositions[0]
if fgMsg.InsertData == nil { if fgMsg.InsertData == nil {
insertData, err := writebuffer.PrepareInsert(wNode.metacache.GetSchema(fgMsg.TimeTick()), wNode.pkField, fgMsg.InsertMessages) insertData := make([]*writebuffer.InsertData, 0)
if err != nil { if len(fgMsg.InsertMessages) > 0 {
log.Error("failed to prepare data", zap.Error(err)) var err error
panic(err) if insertData, err = writebuffer.PrepareInsert(wNode.metacache.GetSchema(fgMsg.TimeTick()), wNode.pkField, fgMsg.InsertMessages); err != nil {
log.Error("failed to prepare data", zap.Error(err))
panic(err)
}
} }
fgMsg.InsertData = insertData fgMsg.InsertData = insertData
} }