Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
feat: add megaphone api token auth
Browse files Browse the repository at this point in the history
Closes #1164
  • Loading branch information
bbangert committed Mar 28, 2018
1 parent 6720bbf commit 1e74f96
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 5 deletions.
1 change: 1 addition & 0 deletions autopush/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ class AutopushConfig(object):
statsd_host = attrib(default="localhost") # type: str
statsd_port = attrib(default=8125) # type: int
megaphone_api_url = attrib(default=None) # type: Optional[str]
megaphone_api_token = attrib(default=None) # type: Optional[str]
megaphone_poll_interval = attrib(default=30) # type: int

datadog_api_key = attrib(default=None) # type: Optional[str]
Expand Down
1 change: 1 addition & 0 deletions autopush/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ def from_argparse(cls, ns, resource=None): # pragma: nocover
close_handshake_timeout=ns.close_handshake_timeout,
aws_ddb_endpoint=ns.aws_ddb_endpoint,
megaphone_api_url=ns.megaphone_api_url,
megaphone_api_token=ns.megaphone_api_token,
megaphone_poll_interval=ns.megaphone_poll_interval,
resource=resource
)
3 changes: 3 additions & 0 deletions autopush/main_argparse.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,9 @@ def parse_connection(config_files, args):
parser.add_argument('--megaphone_api_url',
help="The megaphone API URL to query for updates",
default=None, type=str, env_var="MEGAPHONE_API_URL")
parser.add_argument('--megaphone_api_token',
help="The megaphone API token",
default=None, type=str, env_var="MEGAPHONE_API_TOKEN")
parser.add_argument('--megaphone_poll_interval',
help="The megaphone API polling interval",
default=30, type=int,
Expand Down
3 changes: 3 additions & 0 deletions autopush/tests/test_rs_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,11 @@ class MockMegaphoneRequestHandler(BaseHTTPRequestHandler):
API_PATTERN = re.compile(r'/v1/broadcasts')
services = {}
polled = Event()
token = "Bearer {}".format(uuid.uuid4().hex)

def do_GET(self):
if re.search(self.API_PATTERN, self.path):
assert self.headers.getheader("Authorization") == self.token
self.send_response(requests.codes.ok)
self.send_header('Content-Type', 'application/json; charset=utf-8')
self.end_headers()
Expand Down Expand Up @@ -806,6 +808,7 @@ def setUp(self):
close_handshake_timeout=5,
max_connections=5000,
megaphone_api_url=megaphone_api_url,
megaphone_api_token=MockMegaphoneRequestHandler.token,
megaphone_poll_interval=1,
**self.conn_kwargs()
)
Expand Down
1 change: 1 addition & 0 deletions autopush_rs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def __init__(self, conf, queue):
cfg.statsd_host = ffi_from_buffer(conf.statsd_host)
cfg.statsd_port = conf.statsd_port
cfg.megaphone_api_url = ffi_from_buffer(conf.megaphone_api_url)
cfg.megaphone_api_token = ffi_from_buffer(conf.megaphone_api_token)
cfg.megaphone_poll_interval = conf.megaphone_poll_interval

ptr = _call(lib.autopush_server_new, cfg)
Expand Down
22 changes: 18 additions & 4 deletions autopush_rs/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use futures::sync::oneshot;
use futures::task;
use futures::{Stream, Future, Sink, Async, Poll, AsyncSink, StartSend};
use hyper;
use hyper::header;
use hyper::server::Http;
use libc::c_char;
use openssl::ssl::SslAcceptor;
Expand Down Expand Up @@ -83,6 +84,7 @@ pub struct AutopushServerOptions {
pub statsd_host: *const c_char,
pub statsd_port: u16,
pub megaphone_api_url: *const c_char,
pub megaphone_api_token: *const c_char,
pub megaphone_poll_interval: u32,
}

Expand Down Expand Up @@ -113,6 +115,7 @@ pub struct ServerOptions {
pub statsd_port: u16,
pub logger: util::LogGuards,
pub megaphone_api_url: Option<String>,
pub megaphone_api_token: Option<String>,
pub megaphone_poll_interval: Duration,
}

Expand Down Expand Up @@ -174,6 +177,7 @@ pub extern "C" fn autopush_server_new(
open_handshake_timeout: ito_dur(opts.open_handshake_timeout),
logger: logger,
megaphone_api_url: to_s(opts.megaphone_api_url).map(|s| s.to_string()),
megaphone_api_token: to_s(opts.megaphone_api_token).map(|s| s.to_string()),
megaphone_poll_interval: ito_dur(opts.megaphone_poll_interval).expect("poll interval cannot be 0"),
};

Expand Down Expand Up @@ -326,7 +330,10 @@ impl Server {
fn new(opts: &Arc<ServerOptions>, tx: queue::Sender) -> Result<(Rc<Server>, Core)> {
let core = Core::new()?;
let broadcaster = if let Some(ref megaphone_url) = opts.megaphone_api_url {
ServiceChangeTracker::with_api_services(megaphone_url)
let megaphone_token = opts.megaphone_api_token.as_ref().expect(
"Megaphone API requires a Megaphone API Token to be set"
);
ServiceChangeTracker::with_api_services(megaphone_url, megaphone_token)
.expect("Unable to initialize megaphone with provided URL".into())
} else {
ServiceChangeTracker::new(Vec::new())
Expand Down Expand Up @@ -437,8 +444,11 @@ impl Server {
});

if let Some(ref megaphone_url) = opts.megaphone_api_url {
let megaphone_token = opts.megaphone_api_token.as_ref().expect(
"Megaphone API requires a Megaphone API Token to be set"
);
let fut = MegaphoneUpdater::new(
megaphone_url, opts.megaphone_poll_interval, &srv2,
megaphone_url, megaphone_token, opts.megaphone_poll_interval, &srv2,
)
.expect("Unable to start megaphone updater".into());
core.handle().spawn(fut.then(|res| {
Expand Down Expand Up @@ -548,21 +558,23 @@ enum MegaphoneState {
struct MegaphoneUpdater {
srv: Rc<Server>,
api_url: String,
api_token: String,
state: MegaphoneState,
timeout: Timeout,
poll_interval: Duration,
client: reqwest::unstable::async::Client,
}

impl MegaphoneUpdater {
fn new(uri: &str, poll_interval: Duration, srv: &Rc<Server>) -> io::Result<MegaphoneUpdater> {
fn new(uri: &str, token: &str, poll_interval: Duration, srv: &Rc<Server>) -> io::Result<MegaphoneUpdater> {
let client = reqwest::unstable::async::Client::builder()
.timeout(Duration::from_secs(1))
.build(&srv.handle)
.expect("Unable to build reqwest client".into());
Ok(MegaphoneUpdater {
srv: srv.clone(),
api_url: uri.to_string(),
api_token: token.to_string(),
state: MegaphoneState::Waiting,
timeout: Timeout::new(poll_interval, &srv.handle)?,
poll_interval,
Expand All @@ -581,7 +593,9 @@ impl Future for MegaphoneUpdater {
MegaphoneState::Waiting => {
try_ready!(self.timeout.poll());
debug!("Sending megaphone API request");
let fut = self.client.get(&self.api_url).send()
let fut = self.client.get(&self.api_url)
.header(header::Authorization(self.api_token.clone()))
.send()
.and_then(|response| response.error_for_status())
.and_then(|mut response| response.json())
.map_err(|_| "Unable to query/decode the API query".into());
Expand Down
3 changes: 2 additions & 1 deletion autopush_rs/src/util/megaphone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,12 @@ impl ServiceChangeTracker {
/// as provided as the fetch URL.
///
/// This method uses a synchronous HTTP call.
pub fn with_api_services(url: &str) -> reqwest::Result<ServiceChangeTracker> {
pub fn with_api_services(url: &str, token: &str) -> reqwest::Result<ServiceChangeTracker> {
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(1))
.build()?;
let MegaphoneAPIResponse { broadcasts } = client.get(url)
.header(reqwest::header::Authorization(token.to_string()))
.send()?
.error_for_status()?
.json()?;
Expand Down
12 changes: 12 additions & 0 deletions configs/autopush_connection.ini.sample
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,15 @@ router_port = 8081
; The client handshake timeout, in seconds. Clients that fail to send a
; handshake before the timeout will be disconnected. Set to 0 to disable.
hello_timeout = 0

; Autopush-rs only settings
;
; Megaphone API URL
#megaphone_api_url = https://megaphone.services.mozilla.com/v1/broadcasts

; Megaphone API Token
#megaphone_api_token = Bearer some_token

; Megaphone polling interval, determines how frequently the connection node
; will poll megaphone for service id's
#megaphone_poll_interval = 30

0 comments on commit 1e74f96

Please sign in to comment.