Fix datanode graceful stop panic (#25932)

See also: #25925

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
XuanYang-cn 2023-07-28 10:11:08 +08:00 committed by GitHub
parent 833674c1cb
commit 84253f255e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 28 additions and 26 deletions

View File

@ -113,6 +113,7 @@ type DataNode struct {
//call once //call once
initOnce sync.Once initOnce sync.Once
startOnce sync.Once startOnce sync.Once
stopOnce sync.Once
wg sync.WaitGroup wg sync.WaitGroup
sessionMu sync.Mutex // to fix data race sessionMu sync.Mutex // to fix data race
session *sessionutil.Session session *sessionutil.Session
@ -583,34 +584,32 @@ func (node *DataNode) ReadyToFlush() error {
// Stop will release DataNode resources and shutdown datanode // Stop will release DataNode resources and shutdown datanode
func (node *DataNode) Stop() error { func (node *DataNode) Stop() error {
// https://github.com/milvus-io/milvus/issues/12282 node.stopOnce.Do(func() {
node.UpdateStateCode(commonpb.StateCode_Abnormal) node.cancel()
node.flowgraphManager.dropAll() // https://github.com/milvus-io/milvus/issues/12282
node.flowgraphManager.stop() node.UpdateStateCode(commonpb.StateCode_Abnormal)
node.flowgraphManager.close()
node.cancel() node.eventManagerMap.Range(func(_ string, m *channelEventManager) bool {
node.wg.Wait() m.Close()
node.eventManagerMap.Range(func(_ string, m *channelEventManager) bool { return true
m.Close() })
return true
})
if node.allocator != nil { if node.allocator != nil {
log.Info("close id allocator", zap.String("role", typeutil.DataNodeRole)) log.Info("close id allocator", zap.String("role", typeutil.DataNodeRole))
node.allocator.Close() node.allocator.Close()
}
if node.closer != nil {
err := node.closer.Close()
if err != nil {
return err
} }
}
if node.session != nil { if node.closer != nil {
node.session.Stop() node.closer.Close()
} }
if node.session != nil {
node.session.Stop()
}
node.wg.Wait()
})
return nil return nil
} }

View File

@ -61,7 +61,8 @@ func (fm *flowgraphManager) start() {
} }
} }
func (fm *flowgraphManager) stop() { func (fm *flowgraphManager) close() {
fm.dropAll()
fm.closeOnce.Do(func() { fm.closeOnce.Do(func() {
close(fm.closeCh) close(fm.closeCh)
}) })

View File

@ -70,7 +70,7 @@ var rateCol *ratelimitutil.RateCollector
// Proxy of milvus // Proxy of milvus
type Proxy struct { type Proxy struct {
ctx context.Context ctx context.Context
cancel func() cancel context.CancelFunc
wg sync.WaitGroup wg sync.WaitGroup
initParams *internalpb.InitParams initParams *internalpb.InitParams

View File

@ -66,7 +66,9 @@ func (inNode *InputNode) Operate(in []Msg) []Msg {
if !ok { if !ok {
log.Warn("input closed", zap.Any("input node", inNode.Name())) log.Warn("input closed", zap.Any("input node", inNode.Name()))
if inNode.lastMsg != nil { if inNode.lastMsg != nil {
log.Info("trigger force sync", zap.Int64("collection", inNode.collectionID), zap.Any("position", inNode.lastMsg)) log.Info("trigger force sync",
zap.Int64("collection", inNode.collectionID),
zap.Any("position", inNode.lastMsg.EndPositions))
return []Msg{&MsgStreamMsg{ return []Msg{&MsgStreamMsg{
BaseMsg: NewBaseMsg(true), BaseMsg: NewBaseMsg(true),
tsMessages: []msgstream.TsMsg{}, tsMessages: []msgstream.TsMsg{},