Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
feat: port hello command to Rust
Browse files Browse the repository at this point in the history
Co-authored by: Phil Jenvey <pjenvey@underboss.org>

Closes #1188
  • Loading branch information
bbangert committed May 8, 2018
1 parent 49ab486 commit e9af90b
Show file tree
Hide file tree
Showing 15 changed files with 410 additions and 128 deletions.
3 changes: 3 additions & 0 deletions autopush/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ class AutopushConfig(object):

allow_table_rotation = attrib(default=True) # type: bool

router_table_name = attrib(default=None) # type: Optional[str]
message_table_names = attrib(default=None) # type: Optional[str]

def __attrs_post_init__(self):
"""Initialize the Settings object"""
# Setup hosts/ports/urls
Expand Down
1 change: 1 addition & 0 deletions autopush/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,7 @@ def from_argparse(cls, ns, resource=None): # pragma: nocover
cert=ns.router_ssl_cert,
dh_param=ns.ssl_dh_param
),
router_table_name=ns.router_tablename,
# XXX: default is for autopush_rs
auto_ping_interval=ns.auto_ping_interval or 300,
auto_ping_timeout=ns.auto_ping_timeout,
Expand Down
1 change: 0 additions & 1 deletion autopush/main_argparse.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,6 @@ def parse_connection(config_files, args):
help="The megaphone API polling interval",
default=30, type=int,
env_var="MEGAPHONE_POLL_INTERVAL")

add_shared_args(parser)
return parser.parse_args(args)

Expand Down
25 changes: 2 additions & 23 deletions autopush/tests/test_rs_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ class TestRustWebPush(unittest.TestCase):
router_table=dict(tablename=ROUTER_TABLE),
message_table=dict(tablename=MESSAGE_TABLE),
use_cryptography=True,
router_table_name=ROUTER_TABLE,
)

def start_ep(self, ep_conf):
Expand Down Expand Up @@ -224,28 +225,6 @@ def test_no_rotation(self):
self.start_conn(self._conn_conf)
yield self.shut_down(client)

@inlineCallbacks
def test_hello_only_has_three_calls(self):
db.TRACK_DB_CALLS = True
client = Client(self._ws_url)
yield client.connect()
result = yield client.hello()
assert result != {}
assert result["use_webpush"] is True

# Disconnect and reconnect to trigger storage check
yield client.disconnect()
yield client.connect()
result = yield client.hello()
assert result != {}
assert result["use_webpush"] is True
yield client.wait_for(lambda: len(db.DB_CALLS) == 2)
assert db.DB_CALLS == ['register_user', 'register_user']
db.DB_CALLS = []
db.TRACK_DB_CALLS = False

yield self.shut_down(client)

@inlineCallbacks
def test_hello_echo(self):
client = Client(self._ws_url)
Expand Down Expand Up @@ -684,7 +663,6 @@ def test_message_with_topic(self):
client = yield self.quick_register()
yield client.send_notification(data=data, topic="topicname")
self.conn.db.metrics.increment.assert_has_calls([
call('ua.command.hello'),
call('ua.command.register'),
# We can't see Rust metric calls
# call('ua.notification.topic')
Expand Down Expand Up @@ -814,6 +792,7 @@ class TestRustWebPushBroadcast(unittest.TestCase):
message_table=dict(tablename=MESSAGE_TABLE),
use_cryptography=True,
human_logs=False,
router_table_name=ROUTER_TABLE,
)

def setUp(self):
Expand Down
2 changes: 1 addition & 1 deletion autopush/webpush_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ def __init__(self, conf, db, num_threads=10):
self.incoming = AutopushQueue()
self.workers = [] # type: List[Thread]
self.command_processor = CommandProcessor(conf, self.db)
self.rust = AutopushServer(conf, self.incoming)
self.rust = AutopushServer(conf, db.message_tables, self.incoming)
self.running = False

def start(self):
Expand Down
1 change: 1 addition & 0 deletions autopush_rs/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions autopush_rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ libc = "0.2.40"
log = { version = "0.4.1", features = ["max_level_trace", "release_max_level_warn"] }
matches = "0.1.6"
openssl = "0.10.5"
rand = "0.4"
regex = "0.2"
reqwest = { version = "0.8.5", features = ["unstable"] }
rusoto_core = "0.32.0"
Expand Down
7 changes: 5 additions & 2 deletions autopush_rs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ def free(obj, free_fn):


class AutopushServer(object):
def __init__(self, conf, queue):
# type: (AutopushConfig, AutopushQueue) -> AutopushServer
def __init__(self, conf, message_tables, queue):
# type: (AutopushConfig, List[str], AutopushQueue) -> AutopushServer
cfg = ffi.new('AutopushServerOptions*')
cfg.auto_ping_interval = conf.auto_ping_interval
cfg.auto_ping_timeout = conf.auto_ping_timeout
Expand All @@ -29,12 +29,15 @@ def __init__(self, conf, queue):
cfg.open_handshake_timeout = 5
cfg.port = conf.port
cfg.router_port = conf.router_port
cfg.router_url = ffi_from_buffer(conf.router_url)
cfg.ssl_cert = ffi_from_buffer(conf.ssl.cert)
cfg.ssl_dh_param = ffi_from_buffer(conf.ssl.dh_param)
cfg.ssl_key = ffi_from_buffer(conf.ssl.key)
cfg.json_logging = not conf.human_logs
cfg.statsd_host = ffi_from_buffer(conf.statsd_host)
cfg.statsd_port = conf.statsd_port
cfg.router_table_name = ffi_from_buffer(conf.router_table.tablename)
cfg.message_table_names = ffi_from_buffer(','.join(name.encode('utf-8') for name in message_tables))
cfg.megaphone_api_url = ffi_from_buffer(conf.megaphone_api_url)
cfg.megaphone_api_token = ffi_from_buffer(conf.megaphone_api_token)
cfg.megaphone_poll_interval = conf.megaphone_poll_interval
Expand Down
30 changes: 0 additions & 30 deletions autopush_rs/src/call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use libc::c_char;
use serde::de;
use serde::ser;
use serde_json;
use uuid::Uuid;

use errors::*;
use rt::{self, AutopushError, UnwindGuard};
Expand Down Expand Up @@ -117,11 +116,6 @@ impl<F: FnOnce(&str) + Send> FnBox for F {
#[derive(Serialize)]
#[serde(tag = "command", rename_all = "snake_case")]
enum Call {
Hello {
connected_at: i64,
uaid: Option<String>,
},

Register {
uaid: String,
channel_id: String,
Expand Down Expand Up @@ -162,16 +156,6 @@ struct PythonError {
pub error_msg: String,
}

#[derive(Deserialize)]
pub struct HelloResponse {
pub uaid: Option<Uuid>,
pub message_month: String,
pub check_storage: bool,
pub reset_uaid: bool,
pub rotate_message_table: bool,
pub connected_at: u64,
}

#[derive(Deserialize)]
#[serde(untagged)]
pub enum RegisterResponse {
Expand Down Expand Up @@ -221,20 +205,6 @@ pub struct StoreMessagesResponse {
}

impl Server {
pub fn hello(&self, connected_at: &u64, uaid: Option<&Uuid>) -> MyFuture<HelloResponse> {
let ms = *connected_at as i64;
let (call, fut) = PythonCall::new(&Call::Hello {
connected_at: ms,
uaid: if let Some(uuid) = uaid {
Some(uuid.simple().to_string())
} else {
None
},
});
self.send_to_python(call);
return fut;
}

pub fn register(
&self,
uaid: String,
Expand Down
17 changes: 12 additions & 5 deletions autopush_rs/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use errors::*;
use protocol::{ClientMessage, Notification, ServerMessage, ServerNotification};
use server::Server;
use util::{ms_since_epoch, parse_user_agent, sec_since_epoch};
use util::ddb_helpers::CheckStorageResponse;
use util::ddb_helpers::{CheckStorageResponse, HelloResponse};
use util::megaphone::{ClientServices, Service, ServiceClientInit};

// Created and handed to the AutopushServer
Expand Down Expand Up @@ -282,7 +282,7 @@ where

#[state_machine_future(transitions(AwaitSessionComplete))]
AwaitProcessHello {
response: MyFuture<call::HelloResponse>,
response: MyFuture<HelloResponse>,
data: UnAuthClientData<T>,
interested_broadcasts: Vec<Service>,
tx: mpsc::UnboundedSender<ServerNotification>,
Expand Down Expand Up @@ -337,7 +337,14 @@ where
let AwaitHello { data, tx, rx, .. } = hello.take();
let connected_at = ms_since_epoch();
transition!(AwaitProcessHello {
response: data.srv.hello(&connected_at, uaid.as_ref()),
response: data.srv.ddb.hello(
&connected_at,
uaid.as_ref(),
&data.srv.opts.router_table_name,
&data.srv.opts.router_url,
&data.srv.opts.message_table_names,
&data.srv.metrics,
),
data,
interested_broadcasts: services,
tx,
Expand All @@ -351,7 +358,7 @@ where
debug!("State: AwaitProcessHello");
let (uaid, message_month, check_storage, reset_uaid, rotate_message_table, connected_at) = {
match try_ready!(process_hello.response.poll()) {
call::HelloResponse {
HelloResponse {
uaid: Some(uaid),
message_month,
check_storage,
Expand All @@ -366,7 +373,7 @@ where
rotate_message_table,
connected_at,
),
call::HelloResponse { uaid: None, .. } => {
HelloResponse { uaid: None, .. } => {
return Err("Already connected elsewhere".into())
}
}
Expand Down
1 change: 1 addition & 0 deletions autopush_rs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ extern crate libc;
#[macro_use]
extern crate matches;
extern crate openssl;
extern crate rand;
extern crate regex;
extern crate reqwest;
extern crate rusoto_core;
Expand Down
18 changes: 17 additions & 1 deletion autopush_rs/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ pub struct AutopushServerOptions {
pub max_connections: u32,
pub close_handshake_timeout: u32,
pub json_logging: i32,
pub message_table_names: *const c_char,
pub router_table_name: *const c_char,
pub router_url: *const c_char,
pub statsd_host: *const c_char,
pub statsd_port: u16,
pub megaphone_api_url: *const c_char,
Expand Down Expand Up @@ -114,6 +117,9 @@ pub struct ServerOptions {
pub auto_ping_timeout: Duration,
pub max_connections: Option<u32>,
pub close_handshake_timeout: Option<Duration>,
pub message_table_names: Vec<String>,
pub router_table_name: String,
pub router_url: String,
pub statsd_host: Option<String>,
pub statsd_port: u16,
pub megaphone_api_url: Option<String>,
Expand Down Expand Up @@ -161,12 +167,21 @@ pub extern "C" fn autopush_server_new(
let opts = &*opts;

util::init_logging(opts.json_logging != 0);
let opts = ServerOptions {
let mut opts = ServerOptions {
debug: opts.debug != 0,
port: opts.port,
router_port: opts.router_port,
statsd_host: to_s(opts.statsd_host).map(|s| s.to_string()),
statsd_port: opts.statsd_port,
message_table_names: to_s(opts.message_table_names).map(|s| s.to_string())
.expect("message table names must be specified")
.split(",")
.map(|s| s.trim().to_string())
.collect(),
router_table_name: to_s(opts.router_table_name).map(|s| s.to_string())
.expect("router table name must be specified"),
router_url: to_s(opts.router_url).map(|s| s.to_string())
.expect("router url must be specified"),
ssl_key: to_s(opts.ssl_key).map(PathBuf::from),
ssl_cert: to_s(opts.ssl_cert).map(PathBuf::from),
ssl_dh_param: to_s(opts.ssl_dh_param).map(PathBuf::from),
Expand All @@ -185,6 +200,7 @@ pub extern "C" fn autopush_server_new(
megaphone_poll_interval: ito_dur(opts.megaphone_poll_interval)
.expect("poll interval cannot be 0"),
};
opts.message_table_names.sort_unstable();

Box::new(AutopushServer {
inner: UnwindGuard::new(AutopushServerInner {
Expand Down
Loading

0 comments on commit e9af90b

Please sign in to comment.