Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1131 from mozilla-services/fix/1090
Browse files Browse the repository at this point in the history
fix: make autopush_rs's ctrl-c work gracefully
  • Loading branch information
bbangert authored Feb 14, 2018
2 parents ffd3a8d + 624de2d commit 976b03c
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 82 deletions.
46 changes: 13 additions & 33 deletions autopush/main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""autopush/autoendpoint daemon scripts"""
import os
import time
from argparse import Namespace # noqa

from twisted.application.internet import (
Expand Down Expand Up @@ -148,14 +149,13 @@ def main(cls, args=None, use_files=True, resource=None):
firehose_delivery_stream=ns.firehose_stream_name
)
try:
cls.argparse = cls.from_argparse(ns, resource=resource)
app = cls.argparse
app = cls.from_argparse(ns, resource=resource)
except InvalidConfig as e:
log.critical(str(e))
return 1

app.setup()
app.run()
return app.run()


class EndpointApplication(AutopushMultiService):
Expand Down Expand Up @@ -345,9 +345,17 @@ def setup(self, rotate_tables=True):

def run(self):
try:
self.push_server.run()
self.push_server.start()
while True:
try:
# handle a graceful shutdown on SIGINT w/ a busy
# loop. we can't Thread.join because SIGINT won't
# interrupt it
time.sleep(6000)
except KeyboardInterrupt:
return 1
finally:
self.stopService()
self.push_server.stop()

@inlineCallbacks
def stopService(self):
Expand Down Expand Up @@ -380,31 +388,3 @@ def from_argparse(cls, ns, resource=None):
aws_ddb_endpoint=ns.aws_ddb_endpoint,
resource=resource
)

@classmethod
def main(cls, args=None, use_files=True, resource=None):
# type: (Sequence[str], bool, DynamoDBResource) -> Any
"""Entry point to autopush's main command line scripts.
aka autopush/autoendpoint.
"""
ns = cls.parse_args(cls.config_files if use_files else [], args)
if not ns.no_aws:
logging.HOSTNAME = utils.get_ec2_instance_id()
PushLogger.setup_logging(
cls.logger_name,
log_level=ns.log_level or ("debug" if ns.debug else "info"),
log_format="text" if ns.human_logs else "json",
log_output=ns.log_output,
sentry_dsn=bool(os.environ.get("SENTRY_DSN")),
firehose_delivery_stream=ns.firehose_stream_name
)
try:
app = cls.from_argparse(ns, resource=resource)
except InvalidConfig as e:
log.critical(str(e))
return 1

app.setup()
app.run()
4 changes: 1 addition & 3 deletions autopush/tests/test_rs_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,7 @@ def setUp(self):
)
self.conn = WebPushServer(conn_conf, db, num_threads=2)
self.conn.start()

def tearDown(self):
self.conn.stop()
self.addCleanup(self.conn.stop)

def endpoint_kwargs(self):
return self._endpoint_defaults
Expand Down
7 changes: 2 additions & 5 deletions autopush/webpush_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,6 @@ def __init__(self, conf, db, num_threads=10):
self.rust = AutopushServer(conf, self.incoming)
self.running = False

def run(self):
self.start()
for worker in self.workers:
worker.join()

def start(self):
# type: (int) -> None
self.running = True
Expand All @@ -263,6 +258,8 @@ def start(self):
def stop(self):
self.running = False
self.rust.stopService()
for worker in self.workers:
worker.join()

def _create_thread_worker(self, processor, input_queue):
# type: (CommandProcessor, AutopushQueue) -> Thread
Expand Down
2 changes: 2 additions & 0 deletions autopush_rs/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use cadence;
use futures::Future;
use httparse;
use serde_json;
use sentry;
use tungstenite;

error_chain! {
Expand All @@ -40,6 +41,7 @@ error_chain! {
Json(serde_json::Error);
Httparse(httparse::Error);
MetricError(cadence::MetricError);
SentryError(sentry::Error);
}

errors {
Expand Down
95 changes: 54 additions & 41 deletions autopush_rs/src/server/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::cell::{Cell, RefCell};
use std::collections::HashMap;
use std::env;
use std::ffi::CStr;
use std::io;
use std::net::{SocketAddr, ToSocketAddrs};
Expand All @@ -12,7 +13,6 @@ use std::thread;
use std::time::{Instant, Duration};

use cadence::StatsdClient;
use futures;
use futures::sync::oneshot;
use futures::task;
use futures::{Stream, Future, Sink, Async, Poll, AsyncSink, StartSend};
Expand Down Expand Up @@ -55,11 +55,13 @@ pub struct AutopushServer {
inner: UnwindGuard<AutopushServerInner>,
}

// a signaler to shut down a tokio Core and its associated thread
struct ShutdownHandle(oneshot::Sender<()>, thread::JoinHandle<()>);

struct AutopushServerInner {
opts: Arc<ServerOptions>,
// Used when shutting down a server
tx: Cell<Option<oneshot::Sender<()>>>,
thread: Cell<Option<thread::JoinHandle<()>>>,
shutdown_handles: Cell<Option<Vec<ShutdownHandle>>>,
}

#[repr(C)]
Expand Down Expand Up @@ -191,8 +193,7 @@ pub extern "C" fn autopush_server_new(
Box::new(AutopushServer {
inner: UnwindGuard::new(AutopushServerInner {
opts: Arc::new(opts),
tx: Cell::new(None),
thread: Cell::new(None),
shutdown_handles: Cell::new(None),
}),
})
})
Expand All @@ -207,9 +208,8 @@ pub extern "C" fn autopush_server_start(
unsafe {
(*srv).inner.catch(err, |srv| {
let tx = (*queue).tx();
let (tx, thread) = Server::start(&srv.opts, tx).expect("failed to start server");
srv.tx.set(Some(tx));
srv.thread.set(Some(thread));
let handles = Server::start(&srv.opts, tx).expect("failed to start server");
srv.shutdown_handles.set(Some(handles));
})
}
}
Expand All @@ -234,11 +234,16 @@ impl AutopushServerInner {
/// Blocks execution of the calling thread until the helper thread with the
/// tokio reactor has exited.
fn stop(&self) -> Result<()> {
drop(self.tx.take());
if let Some(thread) = self.thread.take() {
thread.join().map_err(ErrorKind::Thread)?;
let mut result = Ok(());
if let Some(shutdown_handles) = self.shutdown_handles.take() {
for ShutdownHandle(tx, thread) in shutdown_handles {
let _ = tx.send(());
if let Err(err) = thread.join() {
result = Err(From::from(ErrorKind::Thread(err)));
}
}
}
Ok(())
result
}
}

Expand All @@ -252,15 +257,16 @@ impl Server {
/// Creates a new server handle to send to python.
///
/// This will spawn a new server with the `opts` specified, spinning up a
/// separate thread for the tokio reactor. The returned
/// `AutopushServerInner` is a handle to the spawned thread and can be used
/// to interact with it (e.g. shut it down).
fn start(
opts: &Arc<ServerOptions>,
tx: queue::Sender,
) -> Result<(oneshot::Sender<()>, thread::JoinHandle<()>)> {
let (donetx, donerx) = oneshot::channel();
/// separate thread for the tokio reactor. The returned ShutdownHandles can
/// be used to interact with it (e.g. shut it down).
fn start(opts: &Arc<ServerOptions>, tx: queue::Sender) -> Result<Vec<ShutdownHandle>> {
let mut shutdown_handles = vec![];
if let Some(handle) = Server::start_sentry()? {
shutdown_handles.push(handle);
}

let (inittx, initrx) = oneshot::channel();
let (donetx, donerx) = oneshot::channel();

let opts = opts.clone();
let thread = thread::spawn(move || {
Expand Down Expand Up @@ -294,36 +300,43 @@ impl Server {
}));
}

drop(core.run(donerx));
core.run(donerx).expect("Main Core run error");
});

match initrx.wait() {
Ok(Some(e)) => Err(e),
Ok(None) => Ok((donetx, thread)),
Ok(None) => {
shutdown_handles.push(ShutdownHandle(donetx, thread));
Ok(shutdown_handles)
}
Err(_) => panic::resume_unwind(thread.join().unwrap_err()),
}
}

fn new(opts: &Arc<ServerOptions>, tx: queue::Sender) -> Result<(Rc<Server>, Core)> {
// Setup Sentry logging if a SENTRY_DSN exists
let sentry_dsn_option = option_env!("SENTRY_DSN");
if let Some(sentry_dsn) = sentry_dsn_option {
// Spin up a new thread with a new reactor core for the sentry handler
thread::spawn(move || {
let creds = sentry_dsn
.parse::<sentry::SentryCredential>()
.expect("Invalid Sentry DSN specified");
let mut core = Core::new().expect("Unable to create core");
let sentry = sentry::Sentry::from_settings(core.handle(), Default::default(), creds);
// Get the prior panic hook
let hook = panic::take_hook();
sentry.register_panic_handler(Some(move |info: &PanicInfo| -> () {
hook(info);
}));
core.run(futures::empty::<(), ()>()).expect("Error starting sentry thread");
});
}
/// Setup Sentry logging if a SENTRY_DSN exists
fn start_sentry() -> Result<Option<ShutdownHandle>> {
let creds = match env::var("SENTRY_DSN") {
Ok(dsn) => dsn.parse::<sentry::SentryCredential>()?,
Err(_) => return Ok(None),
};

// Spin up a new thread with a new reactor core for the sentry handler
let (donetx, donerx) = oneshot::channel();
let thread = thread::spawn(move || {
let mut core = Core::new().expect("Unable to create core");
let sentry = sentry::Sentry::from_settings(core.handle(), Default::default(), creds);
// Get the prior panic hook
let hook = panic::take_hook();
sentry.register_panic_handler(Some(move |info: &PanicInfo| -> () {
hook(info);
}));
core.run(donerx).expect("Sentry Core run error");
});

Ok(Some(ShutdownHandle(donetx, thread)))
}

fn new(opts: &Arc<ServerOptions>, tx: queue::Sender) -> Result<(Rc<Server>, Core)> {
let core = Core::new()?;
let srv = Rc::new(Server {
opts: opts.clone(),
Expand Down

0 comments on commit 976b03c

Please sign in to comment.