From 624de2dceb8e8f22916981f1273b86d1079ef1f9 Mon Sep 17 00:00:00 2001 From: Philip Jenvey Date: Tue, 6 Feb 2018 14:24:50 -0800 Subject: [PATCH] fix: make autopush_rs's ctrl-c work gracefully also include the rust sentry thread in the shutdown process and fix pulling SENTRY_DSN from build vs runtime env Closes #1090 --- autopush/main.py | 46 ++++--------- autopush/tests/test_rs_integration.py | 4 +- autopush/webpush_server.py | 7 +- autopush_rs/src/errors.rs | 2 + autopush_rs/src/server/mod.rs | 95 +++++++++++++++------------ 5 files changed, 72 insertions(+), 82 deletions(-) diff --git a/autopush/main.py b/autopush/main.py index e51778df..d9462ee8 100644 --- a/autopush/main.py +++ b/autopush/main.py @@ -1,5 +1,6 @@ """autopush/autoendpoint daemon scripts""" import os +import time from argparse import Namespace # noqa from twisted.application.internet import ( @@ -148,14 +149,13 @@ def main(cls, args=None, use_files=True, resource=None): firehose_delivery_stream=ns.firehose_stream_name ) try: - cls.argparse = cls.from_argparse(ns, resource=resource) - app = cls.argparse + app = cls.from_argparse(ns, resource=resource) except InvalidConfig as e: log.critical(str(e)) return 1 app.setup() - app.run() + return app.run() class EndpointApplication(AutopushMultiService): @@ -345,9 +345,17 @@ def setup(self, rotate_tables=True): def run(self): try: - self.push_server.run() + self.push_server.start() + while True: + try: + # handle a graceful shutdown on SIGINT w/ a busy + # loop. we can't Thread.join because SIGINT won't + # interrupt it + time.sleep(6000) + except KeyboardInterrupt: + return 1 finally: - self.stopService() + self.push_server.stop() @inlineCallbacks def stopService(self): @@ -380,31 +388,3 @@ def from_argparse(cls, ns, resource=None): aws_ddb_endpoint=ns.aws_ddb_endpoint, resource=resource ) - - @classmethod - def main(cls, args=None, use_files=True, resource=None): - # type: (Sequence[str], bool, DynamoDBResource) -> Any - """Entry point to autopush's main command line scripts. - - aka autopush/autoendpoint. - - """ - ns = cls.parse_args(cls.config_files if use_files else [], args) - if not ns.no_aws: - logging.HOSTNAME = utils.get_ec2_instance_id() - PushLogger.setup_logging( - cls.logger_name, - log_level=ns.log_level or ("debug" if ns.debug else "info"), - log_format="text" if ns.human_logs else "json", - log_output=ns.log_output, - sentry_dsn=bool(os.environ.get("SENTRY_DSN")), - firehose_delivery_stream=ns.firehose_stream_name - ) - try: - app = cls.from_argparse(ns, resource=resource) - except InvalidConfig as e: - log.critical(str(e)) - return 1 - - app.setup() - app.run() diff --git a/autopush/tests/test_rs_integration.py b/autopush/tests/test_rs_integration.py index b75d181a..b8acdb09 100644 --- a/autopush/tests/test_rs_integration.py +++ b/autopush/tests/test_rs_integration.py @@ -119,9 +119,7 @@ def setUp(self): ) self.conn = WebPushServer(conn_conf, db, num_threads=2) self.conn.start() - - def tearDown(self): - self.conn.stop() + self.addCleanup(self.conn.stop) def endpoint_kwargs(self): return self._endpoint_defaults diff --git a/autopush/webpush_server.py b/autopush/webpush_server.py index 5748f127..2798da41 100644 --- a/autopush/webpush_server.py +++ b/autopush/webpush_server.py @@ -243,11 +243,6 @@ def __init__(self, conf, db, num_threads=10): self.rust = AutopushServer(conf, self.incoming) self.running = False - def run(self): - self.start() - for worker in self.workers: - worker.join() - def start(self): # type: (int) -> None self.running = True @@ -263,6 +258,8 @@ def start(self): def stop(self): self.running = False self.rust.stopService() + for worker in self.workers: + worker.join() def _create_thread_worker(self, processor, input_queue): # type: (CommandProcessor, AutopushQueue) -> Thread diff --git a/autopush_rs/src/errors.rs b/autopush_rs/src/errors.rs index 32181a75..b6048807 100644 --- a/autopush_rs/src/errors.rs +++ b/autopush_rs/src/errors.rs @@ -31,6 +31,7 @@ use cadence; use futures::Future; use httparse; use serde_json; +use sentry; use tungstenite; error_chain! { @@ -40,6 +41,7 @@ error_chain! { Json(serde_json::Error); Httparse(httparse::Error); MetricError(cadence::MetricError); + SentryError(sentry::Error); } errors { diff --git a/autopush_rs/src/server/mod.rs b/autopush_rs/src/server/mod.rs index 669565ad..c4343210 100644 --- a/autopush_rs/src/server/mod.rs +++ b/autopush_rs/src/server/mod.rs @@ -1,5 +1,6 @@ use std::cell::{Cell, RefCell}; use std::collections::HashMap; +use std::env; use std::ffi::CStr; use std::io; use std::net::{SocketAddr, ToSocketAddrs}; @@ -12,7 +13,6 @@ use std::thread; use std::time::{Instant, Duration}; use cadence::StatsdClient; -use futures; use futures::sync::oneshot; use futures::task; use futures::{Stream, Future, Sink, Async, Poll, AsyncSink, StartSend}; @@ -55,11 +55,13 @@ pub struct AutopushServer { inner: UnwindGuard, } +// a signaler to shut down a tokio Core and its associated thread +struct ShutdownHandle(oneshot::Sender<()>, thread::JoinHandle<()>); + struct AutopushServerInner { opts: Arc, // Used when shutting down a server - tx: Cell>>, - thread: Cell>>, + shutdown_handles: Cell>>, } #[repr(C)] @@ -191,8 +193,7 @@ pub extern "C" fn autopush_server_new( Box::new(AutopushServer { inner: UnwindGuard::new(AutopushServerInner { opts: Arc::new(opts), - tx: Cell::new(None), - thread: Cell::new(None), + shutdown_handles: Cell::new(None), }), }) }) @@ -207,9 +208,8 @@ pub extern "C" fn autopush_server_start( unsafe { (*srv).inner.catch(err, |srv| { let tx = (*queue).tx(); - let (tx, thread) = Server::start(&srv.opts, tx).expect("failed to start server"); - srv.tx.set(Some(tx)); - srv.thread.set(Some(thread)); + let handles = Server::start(&srv.opts, tx).expect("failed to start server"); + srv.shutdown_handles.set(Some(handles)); }) } } @@ -234,11 +234,16 @@ impl AutopushServerInner { /// Blocks execution of the calling thread until the helper thread with the /// tokio reactor has exited. fn stop(&self) -> Result<()> { - drop(self.tx.take()); - if let Some(thread) = self.thread.take() { - thread.join().map_err(ErrorKind::Thread)?; + let mut result = Ok(()); + if let Some(shutdown_handles) = self.shutdown_handles.take() { + for ShutdownHandle(tx, thread) in shutdown_handles { + let _ = tx.send(()); + if let Err(err) = thread.join() { + result = Err(From::from(ErrorKind::Thread(err))); + } + } } - Ok(()) + result } } @@ -252,15 +257,16 @@ impl Server { /// Creates a new server handle to send to python. /// /// This will spawn a new server with the `opts` specified, spinning up a - /// separate thread for the tokio reactor. The returned - /// `AutopushServerInner` is a handle to the spawned thread and can be used - /// to interact with it (e.g. shut it down). - fn start( - opts: &Arc, - tx: queue::Sender, - ) -> Result<(oneshot::Sender<()>, thread::JoinHandle<()>)> { - let (donetx, donerx) = oneshot::channel(); + /// separate thread for the tokio reactor. The returned ShutdownHandles can + /// be used to interact with it (e.g. shut it down). + fn start(opts: &Arc, tx: queue::Sender) -> Result> { + let mut shutdown_handles = vec![]; + if let Some(handle) = Server::start_sentry()? { + shutdown_handles.push(handle); + } + let (inittx, initrx) = oneshot::channel(); + let (donetx, donerx) = oneshot::channel(); let opts = opts.clone(); let thread = thread::spawn(move || { @@ -294,36 +300,43 @@ impl Server { })); } - drop(core.run(donerx)); + core.run(donerx).expect("Main Core run error"); }); match initrx.wait() { Ok(Some(e)) => Err(e), - Ok(None) => Ok((donetx, thread)), + Ok(None) => { + shutdown_handles.push(ShutdownHandle(donetx, thread)); + Ok(shutdown_handles) + } Err(_) => panic::resume_unwind(thread.join().unwrap_err()), } } - fn new(opts: &Arc, tx: queue::Sender) -> Result<(Rc, Core)> { - // Setup Sentry logging if a SENTRY_DSN exists - let sentry_dsn_option = option_env!("SENTRY_DSN"); - if let Some(sentry_dsn) = sentry_dsn_option { - // Spin up a new thread with a new reactor core for the sentry handler - thread::spawn(move || { - let creds = sentry_dsn - .parse::() - .expect("Invalid Sentry DSN specified"); - let mut core = Core::new().expect("Unable to create core"); - let sentry = sentry::Sentry::from_settings(core.handle(), Default::default(), creds); - // Get the prior panic hook - let hook = panic::take_hook(); - sentry.register_panic_handler(Some(move |info: &PanicInfo| -> () { - hook(info); - })); - core.run(futures::empty::<(), ()>()).expect("Error starting sentry thread"); - }); - } + /// Setup Sentry logging if a SENTRY_DSN exists + fn start_sentry() -> Result> { + let creds = match env::var("SENTRY_DSN") { + Ok(dsn) => dsn.parse::()?, + Err(_) => return Ok(None), + }; + + // Spin up a new thread with a new reactor core for the sentry handler + let (donetx, donerx) = oneshot::channel(); + let thread = thread::spawn(move || { + let mut core = Core::new().expect("Unable to create core"); + let sentry = sentry::Sentry::from_settings(core.handle(), Default::default(), creds); + // Get the prior panic hook + let hook = panic::take_hook(); + sentry.register_panic_handler(Some(move |info: &PanicInfo| -> () { + hook(info); + })); + core.run(donerx).expect("Sentry Core run error"); + }); + Ok(Some(ShutdownHandle(donetx, thread))) + } + + fn new(opts: &Arc, tx: queue::Sender) -> Result<(Rc, Core)> { let core = Core::new()?; let srv = Rc::new(Server { opts: opts.clone(),