From 4121e31df1355a9bd86f97caf873a679264e2d5c Mon Sep 17 00:00:00 2001 From: congqixia Date: Mon, 22 Nov 2021 16:23:17 +0800 Subject: [PATCH] Send SIGINT to runner goroutine after etcd disconnects (#12163) Signed-off-by: Congqi Xia --- internal/datacoord/server.go | 10 ++++++++++ internal/datanode/data_node.go | 3 +++ internal/indexcoord/index_coord.go | 3 +++ internal/indexnode/indexnode.go | 3 +++ internal/querycoord/query_coord.go | 3 +++ internal/querynode/query_node.go | 3 +++ internal/rootcoord/root_coord.go | 3 +++ 7 files changed, 28 insertions(+) diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index a6d9d2cec3..7fdacafdbb 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -23,6 +23,7 @@ import ( "math/rand" "sync" "sync/atomic" + "syscall" "time" datanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client" @@ -94,6 +95,7 @@ type Server struct { serverLoopCtx context.Context serverLoopCancel context.CancelFunc serverLoopWg sync.WaitGroup + quitCh chan struct{} isServing ServerState helper ServerHelper @@ -178,6 +180,7 @@ func CreateServer(ctx context.Context, factory msgstream.Factory, opts ...Option rand.Seed(time.Now().UnixNano()) s := &Server{ ctx: ctx, + quitCh: make(chan struct{}), msFactory: factory, flushCh: make(chan UniqueID, 1024), dataNodeCreator: defaultDataNodeCreatorFunc, @@ -201,6 +204,11 @@ func defaultRootCoordCreatorFunc(ctx context.Context, metaRootPath string, etcdE return rootcoordclient.NewClient(ctx, metaRootPath, etcdEndpoints) } +// QuitSignal returns signal when server quits +func (s *Server) QuitSignal() <-chan struct{} { + return s.quitCh +} + // Register register data service at etcd func (s *Server) Register() error { s.session = sessionutil.NewSession(s.ctx, Params.MetaRootPath, Params.EtcdEndpoints) @@ -402,6 +410,8 @@ func (s *Server) startServerLoop() { if err := s.Stop(); err != nil { log.Fatal("failed to stop server", zap.Error(err)) } + // manually send signal to starter goroutine + syscall.Kill(syscall.Getpid(), syscall.SIGINT) }) } diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index fbff96ad82..395db3229f 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -30,6 +30,7 @@ import ( "strings" "sync" "sync/atomic" + "syscall" "time" "github.com/milvus-io/milvus/internal/common" @@ -186,6 +187,8 @@ func (node *DataNode) Register() error { if err := node.Stop(); err != nil { log.Fatal("failed to stop server", zap.Error(err)) } + // manually send signal to starter goroutine + syscall.Kill(syscall.Getpid(), syscall.SIGINT) }) Params.initMsgChannelSubName() diff --git a/internal/indexcoord/index_coord.go b/internal/indexcoord/index_coord.go index ee36ff31b9..eca3908800 100644 --- a/internal/indexcoord/index_coord.go +++ b/internal/indexcoord/index_coord.go @@ -24,6 +24,7 @@ import ( "strconv" "sync" "sync/atomic" + "syscall" "time" "github.com/milvus-io/milvus/internal/common" @@ -250,6 +251,8 @@ func (i *IndexCoord) Start() error { if err := i.Stop(); err != nil { log.Fatal("failed to stop server", zap.Error(err)) } + // manually send signal to starter goroutine + syscall.Kill(syscall.Getpid(), syscall.SIGINT) }) startErr = i.sched.Start() diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go index 5b83c72d27..d5f31a3142 100644 --- a/internal/indexnode/indexnode.go +++ b/internal/indexnode/indexnode.go @@ -35,6 +35,7 @@ import ( "strconv" "sync" "sync/atomic" + "syscall" "time" "unsafe" @@ -196,6 +197,8 @@ func (i *IndexNode) Start() error { if err := i.Stop(); err != nil { log.Fatal("failed to stop server", zap.Error(err)) } + // manually send signal to starter goroutine + syscall.Kill(syscall.Getpid(), syscall.SIGINT) }) i.UpdateStateCode(internalpb.StateCode_Healthy) diff --git a/internal/querycoord/query_coord.go b/internal/querycoord/query_coord.go index 8b39a96ba8..f9c07b5250 100644 --- a/internal/querycoord/query_coord.go +++ b/internal/querycoord/query_coord.go @@ -21,6 +21,7 @@ import ( "errors" "math" "sort" + "syscall" "fmt" "math/rand" @@ -209,6 +210,8 @@ func (qc *QueryCoord) Start() error { if err := qc.Stop(); err != nil { log.Fatal("failed to stop server", zap.Error(err)) } + // manually send signal to starter goroutine + syscall.Kill(syscall.Getpid(), syscall.SIGINT) }) return nil diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index 5c37fdf628..47892121ad 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -32,6 +32,7 @@ import ( "strconv" "sync" "sync/atomic" + "syscall" "time" "unsafe" @@ -131,6 +132,8 @@ func (node *QueryNode) Register() error { if err := node.Stop(); err != nil { log.Fatal("failed to stop server", zap.Error(err)) } + // manually send signal to starter goroutine + syscall.Kill(syscall.Getpid(), syscall.SIGINT) }) Params.QueryNodeID = node.session.ServerID diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 0749d43344..13b74c3b43 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -19,6 +19,7 @@ import ( "strconv" "sync" "sync/atomic" + "syscall" "time" "github.com/milvus-io/milvus/internal/common" @@ -1170,6 +1171,8 @@ func (c *Core) Start() error { if err := c.Stop(); err != nil { log.Fatal("failed to stop server", zap.Error(err)) } + // manually send signal to starter goroutine + syscall.Kill(syscall.Getpid(), syscall.SIGINT) }) Params.CreatedTime = time.Now() Params.UpdatedTime = time.Now()