Send SIGINT to runner goroutine after etcd disconnects (#12163)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2021-11-22 16:23:17 +08:00 committed by GitHub
parent 02bf77aabf
commit 4121e31df1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 28 additions and 0 deletions

View File

@ -23,6 +23,7 @@ import (
"math/rand" "math/rand"
"sync" "sync"
"sync/atomic" "sync/atomic"
"syscall"
"time" "time"
datanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client" datanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client"
@ -94,6 +95,7 @@ type Server struct {
serverLoopCtx context.Context serverLoopCtx context.Context
serverLoopCancel context.CancelFunc serverLoopCancel context.CancelFunc
serverLoopWg sync.WaitGroup serverLoopWg sync.WaitGroup
quitCh chan struct{}
isServing ServerState isServing ServerState
helper ServerHelper helper ServerHelper
@ -178,6 +180,7 @@ func CreateServer(ctx context.Context, factory msgstream.Factory, opts ...Option
rand.Seed(time.Now().UnixNano()) rand.Seed(time.Now().UnixNano())
s := &Server{ s := &Server{
ctx: ctx, ctx: ctx,
quitCh: make(chan struct{}),
msFactory: factory, msFactory: factory,
flushCh: make(chan UniqueID, 1024), flushCh: make(chan UniqueID, 1024),
dataNodeCreator: defaultDataNodeCreatorFunc, dataNodeCreator: defaultDataNodeCreatorFunc,
@ -201,6 +204,11 @@ func defaultRootCoordCreatorFunc(ctx context.Context, metaRootPath string, etcdE
return rootcoordclient.NewClient(ctx, metaRootPath, etcdEndpoints) return rootcoordclient.NewClient(ctx, metaRootPath, etcdEndpoints)
} }
// QuitSignal returns signal when server quits
func (s *Server) QuitSignal() <-chan struct{} {
return s.quitCh
}
// Register register data service at etcd // Register register data service at etcd
func (s *Server) Register() error { func (s *Server) Register() error {
s.session = sessionutil.NewSession(s.ctx, Params.MetaRootPath, Params.EtcdEndpoints) s.session = sessionutil.NewSession(s.ctx, Params.MetaRootPath, Params.EtcdEndpoints)
@ -402,6 +410,8 @@ func (s *Server) startServerLoop() {
if err := s.Stop(); err != nil { if err := s.Stop(); err != nil {
log.Fatal("failed to stop server", zap.Error(err)) log.Fatal("failed to stop server", zap.Error(err))
} }
// manually send signal to starter goroutine
syscall.Kill(syscall.Getpid(), syscall.SIGINT)
}) })
} }

View File

@ -30,6 +30,7 @@ import (
"strings" "strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"syscall"
"time" "time"
"github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/common"
@ -186,6 +187,8 @@ func (node *DataNode) Register() error {
if err := node.Stop(); err != nil { if err := node.Stop(); err != nil {
log.Fatal("failed to stop server", zap.Error(err)) log.Fatal("failed to stop server", zap.Error(err))
} }
// manually send signal to starter goroutine
syscall.Kill(syscall.Getpid(), syscall.SIGINT)
}) })
Params.initMsgChannelSubName() Params.initMsgChannelSubName()

View File

@ -24,6 +24,7 @@ import (
"strconv" "strconv"
"sync" "sync"
"sync/atomic" "sync/atomic"
"syscall"
"time" "time"
"github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/common"
@ -250,6 +251,8 @@ func (i *IndexCoord) Start() error {
if err := i.Stop(); err != nil { if err := i.Stop(); err != nil {
log.Fatal("failed to stop server", zap.Error(err)) log.Fatal("failed to stop server", zap.Error(err))
} }
// manually send signal to starter goroutine
syscall.Kill(syscall.Getpid(), syscall.SIGINT)
}) })
startErr = i.sched.Start() startErr = i.sched.Start()

View File

@ -35,6 +35,7 @@ import (
"strconv" "strconv"
"sync" "sync"
"sync/atomic" "sync/atomic"
"syscall"
"time" "time"
"unsafe" "unsafe"
@ -196,6 +197,8 @@ func (i *IndexNode) Start() error {
if err := i.Stop(); err != nil { if err := i.Stop(); err != nil {
log.Fatal("failed to stop server", zap.Error(err)) log.Fatal("failed to stop server", zap.Error(err))
} }
// manually send signal to starter goroutine
syscall.Kill(syscall.Getpid(), syscall.SIGINT)
}) })
i.UpdateStateCode(internalpb.StateCode_Healthy) i.UpdateStateCode(internalpb.StateCode_Healthy)

View File

@ -21,6 +21,7 @@ import (
"errors" "errors"
"math" "math"
"sort" "sort"
"syscall"
"fmt" "fmt"
"math/rand" "math/rand"
@ -209,6 +210,8 @@ func (qc *QueryCoord) Start() error {
if err := qc.Stop(); err != nil { if err := qc.Stop(); err != nil {
log.Fatal("failed to stop server", zap.Error(err)) log.Fatal("failed to stop server", zap.Error(err))
} }
// manually send signal to starter goroutine
syscall.Kill(syscall.Getpid(), syscall.SIGINT)
}) })
return nil return nil

View File

@ -32,6 +32,7 @@ import (
"strconv" "strconv"
"sync" "sync"
"sync/atomic" "sync/atomic"
"syscall"
"time" "time"
"unsafe" "unsafe"
@ -131,6 +132,8 @@ func (node *QueryNode) Register() error {
if err := node.Stop(); err != nil { if err := node.Stop(); err != nil {
log.Fatal("failed to stop server", zap.Error(err)) log.Fatal("failed to stop server", zap.Error(err))
} }
// manually send signal to starter goroutine
syscall.Kill(syscall.Getpid(), syscall.SIGINT)
}) })
Params.QueryNodeID = node.session.ServerID Params.QueryNodeID = node.session.ServerID

View File

@ -19,6 +19,7 @@ import (
"strconv" "strconv"
"sync" "sync"
"sync/atomic" "sync/atomic"
"syscall"
"time" "time"
"github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/common"
@ -1170,6 +1171,8 @@ func (c *Core) Start() error {
if err := c.Stop(); err != nil { if err := c.Stop(); err != nil {
log.Fatal("failed to stop server", zap.Error(err)) log.Fatal("failed to stop server", zap.Error(err))
} }
// manually send signal to starter goroutine
syscall.Kill(syscall.Getpid(), syscall.SIGINT)
}) })
Params.CreatedTime = time.Now() Params.CreatedTime = time.Now()
Params.UpdatedTime = time.Now() Params.UpdatedTime = time.Now()