From 1e74f9638c6963e4a9bcac178fcd929a05ffd61a Mon Sep 17 00:00:00 2001 From: Ben Bangert Date: Tue, 27 Mar 2018 16:15:01 -0700 Subject: [PATCH] feat: add megaphone api token auth Closes #1164 --- autopush/config.py | 1 + autopush/main.py | 1 + autopush/main_argparse.py | 3 +++ autopush/tests/test_rs_integration.py | 3 +++ autopush_rs/__init__.py | 1 + autopush_rs/src/server/mod.rs | 22 ++++++++++++++++++---- autopush_rs/src/util/megaphone.rs | 3 ++- configs/autopush_connection.ini.sample | 12 ++++++++++++ 8 files changed, 41 insertions(+), 5 deletions(-) diff --git a/autopush/config.py b/autopush/config.py index 4906442c..08e0e80b 100644 --- a/autopush/config.py +++ b/autopush/config.py @@ -122,6 +122,7 @@ class AutopushConfig(object): statsd_host = attrib(default="localhost") # type: str statsd_port = attrib(default=8125) # type: int megaphone_api_url = attrib(default=None) # type: Optional[str] + megaphone_api_token = attrib(default=None) # type: Optional[str] megaphone_poll_interval = attrib(default=30) # type: int datadog_api_key = attrib(default=None) # type: Optional[str] diff --git a/autopush/main.py b/autopush/main.py index 2cc6a317..bf228faf 100644 --- a/autopush/main.py +++ b/autopush/main.py @@ -381,6 +381,7 @@ def from_argparse(cls, ns, resource=None): # pragma: nocover close_handshake_timeout=ns.close_handshake_timeout, aws_ddb_endpoint=ns.aws_ddb_endpoint, megaphone_api_url=ns.megaphone_api_url, + megaphone_api_token=ns.megaphone_api_token, megaphone_poll_interval=ns.megaphone_poll_interval, resource=resource ) diff --git a/autopush/main_argparse.py b/autopush/main_argparse.py index 4554ad9c..7572963c 100644 --- a/autopush/main_argparse.py +++ b/autopush/main_argparse.py @@ -231,6 +231,9 @@ def parse_connection(config_files, args): parser.add_argument('--megaphone_api_url', help="The megaphone API URL to query for updates", default=None, type=str, env_var="MEGAPHONE_API_URL") + parser.add_argument('--megaphone_api_token', + help="The megaphone API token", + default=None, type=str, env_var="MEGAPHONE_API_TOKEN") parser.add_argument('--megaphone_poll_interval', help="The megaphone API polling interval", default=30, type=int, diff --git a/autopush/tests/test_rs_integration.py b/autopush/tests/test_rs_integration.py index 927e1a51..56520c22 100644 --- a/autopush/tests/test_rs_integration.py +++ b/autopush/tests/test_rs_integration.py @@ -69,9 +69,11 @@ class MockMegaphoneRequestHandler(BaseHTTPRequestHandler): API_PATTERN = re.compile(r'/v1/broadcasts') services = {} polled = Event() + token = "Bearer {}".format(uuid.uuid4().hex) def do_GET(self): if re.search(self.API_PATTERN, self.path): + assert self.headers.getheader("Authorization") == self.token self.send_response(requests.codes.ok) self.send_header('Content-Type', 'application/json; charset=utf-8') self.end_headers() @@ -806,6 +808,7 @@ def setUp(self): close_handshake_timeout=5, max_connections=5000, megaphone_api_url=megaphone_api_url, + megaphone_api_token=MockMegaphoneRequestHandler.token, megaphone_poll_interval=1, **self.conn_kwargs() ) diff --git a/autopush_rs/__init__.py b/autopush_rs/__init__.py index 79ed2a15..319a1e2c 100644 --- a/autopush_rs/__init__.py +++ b/autopush_rs/__init__.py @@ -36,6 +36,7 @@ def __init__(self, conf, queue): cfg.statsd_host = ffi_from_buffer(conf.statsd_host) cfg.statsd_port = conf.statsd_port cfg.megaphone_api_url = ffi_from_buffer(conf.megaphone_api_url) + cfg.megaphone_api_token = ffi_from_buffer(conf.megaphone_api_token) cfg.megaphone_poll_interval = conf.megaphone_poll_interval ptr = _call(lib.autopush_server_new, cfg) diff --git a/autopush_rs/src/server/mod.rs b/autopush_rs/src/server/mod.rs index 8f85836a..af8240b1 100644 --- a/autopush_rs/src/server/mod.rs +++ b/autopush_rs/src/server/mod.rs @@ -17,6 +17,7 @@ use futures::sync::oneshot; use futures::task; use futures::{Stream, Future, Sink, Async, Poll, AsyncSink, StartSend}; use hyper; +use hyper::header; use hyper::server::Http; use libc::c_char; use openssl::ssl::SslAcceptor; @@ -83,6 +84,7 @@ pub struct AutopushServerOptions { pub statsd_host: *const c_char, pub statsd_port: u16, pub megaphone_api_url: *const c_char, + pub megaphone_api_token: *const c_char, pub megaphone_poll_interval: u32, } @@ -113,6 +115,7 @@ pub struct ServerOptions { pub statsd_port: u16, pub logger: util::LogGuards, pub megaphone_api_url: Option, + pub megaphone_api_token: Option, pub megaphone_poll_interval: Duration, } @@ -174,6 +177,7 @@ pub extern "C" fn autopush_server_new( open_handshake_timeout: ito_dur(opts.open_handshake_timeout), logger: logger, megaphone_api_url: to_s(opts.megaphone_api_url).map(|s| s.to_string()), + megaphone_api_token: to_s(opts.megaphone_api_token).map(|s| s.to_string()), megaphone_poll_interval: ito_dur(opts.megaphone_poll_interval).expect("poll interval cannot be 0"), }; @@ -326,7 +330,10 @@ impl Server { fn new(opts: &Arc, tx: queue::Sender) -> Result<(Rc, Core)> { let core = Core::new()?; let broadcaster = if let Some(ref megaphone_url) = opts.megaphone_api_url { - ServiceChangeTracker::with_api_services(megaphone_url) + let megaphone_token = opts.megaphone_api_token.as_ref().expect( + "Megaphone API requires a Megaphone API Token to be set" + ); + ServiceChangeTracker::with_api_services(megaphone_url, megaphone_token) .expect("Unable to initialize megaphone with provided URL".into()) } else { ServiceChangeTracker::new(Vec::new()) @@ -437,8 +444,11 @@ impl Server { }); if let Some(ref megaphone_url) = opts.megaphone_api_url { + let megaphone_token = opts.megaphone_api_token.as_ref().expect( + "Megaphone API requires a Megaphone API Token to be set" + ); let fut = MegaphoneUpdater::new( - megaphone_url, opts.megaphone_poll_interval, &srv2, + megaphone_url, megaphone_token, opts.megaphone_poll_interval, &srv2, ) .expect("Unable to start megaphone updater".into()); core.handle().spawn(fut.then(|res| { @@ -548,6 +558,7 @@ enum MegaphoneState { struct MegaphoneUpdater { srv: Rc, api_url: String, + api_token: String, state: MegaphoneState, timeout: Timeout, poll_interval: Duration, @@ -555,7 +566,7 @@ struct MegaphoneUpdater { } impl MegaphoneUpdater { - fn new(uri: &str, poll_interval: Duration, srv: &Rc) -> io::Result { + fn new(uri: &str, token: &str, poll_interval: Duration, srv: &Rc) -> io::Result { let client = reqwest::unstable::async::Client::builder() .timeout(Duration::from_secs(1)) .build(&srv.handle) @@ -563,6 +574,7 @@ impl MegaphoneUpdater { Ok(MegaphoneUpdater { srv: srv.clone(), api_url: uri.to_string(), + api_token: token.to_string(), state: MegaphoneState::Waiting, timeout: Timeout::new(poll_interval, &srv.handle)?, poll_interval, @@ -581,7 +593,9 @@ impl Future for MegaphoneUpdater { MegaphoneState::Waiting => { try_ready!(self.timeout.poll()); debug!("Sending megaphone API request"); - let fut = self.client.get(&self.api_url).send() + let fut = self.client.get(&self.api_url) + .header(header::Authorization(self.api_token.clone())) + .send() .and_then(|response| response.error_for_status()) .and_then(|mut response| response.json()) .map_err(|_| "Unable to query/decode the API query".into()); diff --git a/autopush_rs/src/util/megaphone.rs b/autopush_rs/src/util/megaphone.rs index ed549943..47098098 100644 --- a/autopush_rs/src/util/megaphone.rs +++ b/autopush_rs/src/util/megaphone.rs @@ -126,11 +126,12 @@ impl ServiceChangeTracker { /// as provided as the fetch URL. /// /// This method uses a synchronous HTTP call. - pub fn with_api_services(url: &str) -> reqwest::Result { + pub fn with_api_services(url: &str, token: &str) -> reqwest::Result { let client = reqwest::Client::builder() .timeout(Duration::from_secs(1)) .build()?; let MegaphoneAPIResponse { broadcasts } = client.get(url) + .header(reqwest::header::Authorization(token.to_string())) .send()? .error_for_status()? .json()?; diff --git a/configs/autopush_connection.ini.sample b/configs/autopush_connection.ini.sample index d749a0b5..f75e22d9 100644 --- a/configs/autopush_connection.ini.sample +++ b/configs/autopush_connection.ini.sample @@ -43,3 +43,15 @@ router_port = 8081 ; The client handshake timeout, in seconds. Clients that fail to send a ; handshake before the timeout will be disconnected. Set to 0 to disable. hello_timeout = 0 + +; Autopush-rs only settings +; +; Megaphone API URL +#megaphone_api_url = https://megaphone.services.mozilla.com/v1/broadcasts + +; Megaphone API Token +#megaphone_api_token = Bearer some_token + +; Megaphone polling interval, determines how frequently the connection node +; will poll megaphone for service id's +#megaphone_poll_interval = 30