diff --git a/internal/coordinator/snmanager/streaming_node_manager.go b/internal/coordinator/snmanager/streaming_node_manager.go index c95285ea07..1ffc6ae497 100644 --- a/internal/coordinator/snmanager/streaming_node_manager.go +++ b/internal/coordinator/snmanager/streaming_node_manager.go @@ -67,6 +67,7 @@ type StreamingNodeManager struct { cond *syncutil.ContextCond latestAssignments map[string]types.PChannelInfoAssigned // The latest assignments info got from streaming coord balance module. nodeChangedNotifier *syncutil.VersionedNotifier // used to notify that node in streaming node manager has been changed. + previousNodeIDs typeutil.UniqueSet // used to store the previous node ids. } // GetBalancer returns the balancer of the streaming node manager. @@ -151,12 +152,15 @@ func (s *StreamingNodeManager) GetStreamingQueryNodeIDs() typeutil.UniqueSet { } streamingNodes, err := balancer.GetAllStreamingNodes(context.Background()) if err != nil { - panic(err) + // when the streaming coord is on shutdown, the balancer will return an error, + // causing panic, so we need to return the previous node ids. + return s.previousNodeIDs } streamingNodeIDs := typeutil.NewUniqueSet() for _, streamingNode := range streamingNodes { streamingNodeIDs.Insert(streamingNode.ServerID) } + s.previousNodeIDs = streamingNodeIDs return streamingNodeIDs }