Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
feat: port register command to Rust
Browse files Browse the repository at this point in the history
Closes #1190
  • Loading branch information
bbangert committed May 10, 2018
1 parent af272df commit 35edd4f
Show file tree
Hide file tree
Showing 8 changed files with 159 additions and 30 deletions.
13 changes: 0 additions & 13 deletions autopush/tests/test_rs_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -652,19 +652,6 @@ def test_message_without_crypto_headers(self):
assert result is None
yield self.shut_down(client)

@inlineCallbacks
def test_message_with_topic(self):
data = str(uuid.uuid4())
self.conn.db.metrics = Mock(spec=SinkMetrics)
client = yield self.quick_register()
yield client.send_notification(data=data, topic="topicname")
self.conn.db.metrics.increment.assert_has_calls([
call('ua.command.register'),
# We can't see Rust metric calls
# call('ua.notification.topic')
])
yield self.shut_down(client)

@inlineCallbacks
def test_empty_message_without_crypto_headers(self):
client = yield self.quick_register()
Expand Down
15 changes: 15 additions & 0 deletions autopush_rs/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions autopush_rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ authors = ["Alex Crichton <alex@alexcrichton.com>"]
crate-type = ["cdylib"]

[dependencies]
base64 = "0.9.1"
bytes = "0.4.6"
cadence = "0.13.2"
chrono = "0.4.2"
env_logger = { version = "0.5.6", default-features = false }
error-chain = "0.11.0"
fernet = "0.1"
futures = "0.1.21"
futures-backoff = "0.1"
hex = "0.3.2"
hostname = "0.1.4"
httparse = "1.2.4"
hyper = "0.11.25"
Expand Down
7 changes: 6 additions & 1 deletion autopush_rs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ def __init__(self, conf, message_tables, queue):
cfg.port = conf.port
cfg.router_port = conf.router_port
cfg.router_url = ffi_from_buffer(conf.router_url)
cfg.endpoint_url = ffi_from_buffer(conf.endpoint_url)
self.crypto_key = ','.join(name.encode('utf-8') for name in
conf._crypto_key)
cfg.crypto_key = ffi_from_buffer(self.crypto_key)
cfg.ssl_cert = ffi_from_buffer(conf.ssl.cert)
cfg.ssl_dh_param = ffi_from_buffer(conf.ssl.dh_param)
cfg.ssl_key = ffi_from_buffer(conf.ssl.key)
Expand All @@ -38,7 +42,8 @@ def __init__(self, conf, message_tables, queue):
cfg.statsd_port = conf.statsd_port
cfg.router_table_name = ffi_from_buffer(conf.router_table.tablename)
# XXX: keepalive
self.message_table_names = ','.join(name.encode('utf-8') for name in message_tables)
self.message_table_names = ','.join(name.encode('utf-8') for name in
message_tables)
cfg.message_table_names = ffi_from_buffer(self.message_table_names)
cfg.megaphone_api_url = ffi_from_buffer(conf.megaphone_api_url)
cfg.megaphone_api_token = ffi_from_buffer(conf.megaphone_api_token)
Expand Down
22 changes: 9 additions & 13 deletions autopush_rs/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use errors::*;
use protocol::{ClientMessage, Notification, ServerMessage, ServerNotification};
use server::Server;
use util::{ms_since_epoch, parse_user_agent, sec_since_epoch};
use util::ddb_helpers::{CheckStorageResponse, HelloResponse};
use util::ddb_helpers::{CheckStorageResponse, HelloResponse, RegisterResponse};
use util::megaphone::{ClientServices, Service, ServiceClientInit};

// Created and handed to the AutopushServer
Expand Down Expand Up @@ -599,7 +599,7 @@ where
#[state_machine_future(transitions(SendThenWait))]
AwaitRegister {
channel_id: Uuid,
response: MyFuture<call::RegisterResponse>,
response: MyFuture<RegisterResponse>,
data: AuthClientData<T>,
},

Expand Down Expand Up @@ -733,17 +733,15 @@ where
}
}
Either::A(ClientMessage::Register { channel_id, key }) => {
data.srv.metrics.incr("ua.command.register").ok();
debug!("Got a register command";
"channel_id" => channel_id.hyphenated().to_string());
let uaid = webpush.uaid.clone();
let message_month = webpush.message_month.clone();
let channel_id_str = channel_id.hyphenated().to_string();
let fut = data.srv.register(
uaid.simple().to_string(),
message_month,
channel_id_str,
key,
);
let srv = data.srv.clone();
let fut = data.srv
.ddb
.register(srv, &uaid, &channel_id, &message_month, key);
transition!(AwaitRegister {
channel_id,
response: fut,
Expand Down Expand Up @@ -960,7 +958,7 @@ where
) -> Poll<AfterAwaitRegister<T>, Error> {
debug!("State: AwaitRegister");
let msg = match try_ready!(await_register.response.poll()) {
call::RegisterResponse::Success { endpoint } => {
RegisterResponse::Success { endpoint } => {
let mut webpush = await_register.data.webpush.borrow_mut();
webpush.stats.registers += 1;
ServerMessage::Register {
Expand All @@ -969,9 +967,7 @@ where
push_endpoint: endpoint,
}
}
call::RegisterResponse::Error {
error_msg, status, ..
} => {
RegisterResponse::Error { error_msg, status } => {
debug!("Got unregister fail, error: {}", error_msg);
ServerMessage::Register {
channel_id: await_register.channel_id,
Expand Down
4 changes: 3 additions & 1 deletion autopush_rs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,15 @@
//! aren't as relevant to the WebPush implementation.
//!
//! Otherwise be sure to check out each module for more documentation!
extern crate base64;
extern crate bytes;
extern crate cadence;
extern crate chrono;
extern crate fernet;
#[macro_use]
extern crate futures;
extern crate futures_backoff;
extern crate hex;
extern crate hostname;
extern crate httparse;
extern crate hyper;
Expand Down
42 changes: 42 additions & 0 deletions autopush_rs/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@ use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};

use base64;
use cadence::StatsdClient;
use fernet::{Fernet, MultiFernet};
use futures::sync::oneshot;
use futures::task;
use futures::{Async, AsyncSink, Future, Poll, Sink, StartSend, Stream};
use hex;
use hyper::server::Http;
use hyper::{self, header, StatusCode};
use libc::c_char;
use openssl::hash;
use openssl::ssl::SslAcceptor;
use reqwest;
use sentry;
Expand Down Expand Up @@ -86,6 +90,8 @@ pub struct AutopushServerOptions {
pub message_table_names: *const c_char,
pub router_table_name: *const c_char,
pub router_url: *const c_char,
pub endpoint_url: *const c_char,
pub crypto_key: *const c_char,
pub statsd_host: *const c_char,
pub statsd_port: u16,
pub megaphone_api_url: *const c_char,
Expand All @@ -109,6 +115,7 @@ pub struct ServerOptions {
pub debug: bool,
pub router_port: u16,
pub port: u16,
fernet: MultiFernet,
pub ssl_key: Option<PathBuf>,
pub ssl_cert: Option<PathBuf>,
pub ssl_dh_param: Option<PathBuf>,
Expand All @@ -120,6 +127,7 @@ pub struct ServerOptions {
pub message_table_names: Vec<String>,
pub router_table_name: String,
pub router_url: String,
pub endpoint_url: String,
pub statsd_host: Option<String>,
pub statsd_port: u16,
pub megaphone_api_url: Option<String>,
Expand Down Expand Up @@ -167,9 +175,18 @@ pub extern "C" fn autopush_server_new(
let opts = &*opts;

util::init_logging(opts.json_logging != 0);
let fernets: Vec<Fernet> = to_s(opts.crypto_key)
.map(|s| s.to_string())
.expect("crypto_key must be specified")
.split(",")
.map(|s| s.trim().to_string())
.map(|key| Fernet::new(&key).expect("Invalid key supplied"))
.collect();
let fernet = MultiFernet::new(fernets);
let mut opts = ServerOptions {
debug: opts.debug != 0,
port: opts.port,
fernet,
router_port: opts.router_port,
statsd_host: to_s(opts.statsd_host).map(|s| s.to_string()),
statsd_port: opts.statsd_port,
Expand All @@ -182,6 +199,8 @@ pub extern "C" fn autopush_server_new(
.expect("router table name must be specified"),
router_url: to_s(opts.router_url).map(|s| s.to_string())
.expect("router url must be specified"),
endpoint_url: to_s(opts.endpoint_url).map(|s| s.to_string())
.expect("endpoint url must be specified"),
ssl_key: to_s(opts.ssl_key).map(PathBuf::from),
ssl_cert: to_s(opts.ssl_cert).map(PathBuf::from),
ssl_dh_param: to_s(opts.ssl_dh_param).map(PathBuf::from),
Expand Down Expand Up @@ -485,6 +504,29 @@ impl Server {
Ok((srv2, core))
}

/// Create an v1 or v2 WebPush endpoint from the identifiers
///
/// Both endpoints use bytes instead of hex to reduce ID length.
// v1 is the uaid + chid
// v2 is the uaid + chid + sha256(key).bytes
pub fn make_endpoint(&self, uaid: &Uuid, chid: &Uuid, key: Option<String>) -> Result<String> {
let root = format!("{}/wpush/", self.opts.endpoint_url);
let mut base = hex::decode(uaid.simple().to_string()).chain_err(|| "Error decoding")?;
base.extend(hex::decode(chid.simple().to_string()).chain_err(|| "Error decoding")?);
if let Some(k) = key {
let raw_key = base64::decode_config(&k, base64::URL_SAFE)
.chain_err(|| "Error encrypting payload")?;
let key_digest = hash::hash(hash::MessageDigest::sha256(), &raw_key)
.chain_err(|| "Error creating message digest for key")?;
base.extend(key_digest.iter());
let encrypted = self.opts.fernet.encrypt(&base);
Ok(format!("{}v2/{}", root, encrypted).trim_matches('=').to_string())
} else {
let encrypted = self.opts.fernet.encrypt(&base);
Ok(format!("{}v1/{}", root, encrypted).trim_matches('=').to_string())
}
}

/// Informs this server that a new `client` has connected
///
/// For now just registers internal state by keeping track of the `client`,
Expand Down
Loading

0 comments on commit 35edd4f

Please sign in to comment.