diff --git a/autopush/tests/test_webpush_server.py b/autopush/tests/test_webpush_server.py index f4f2ad56..86088cb8 100644 --- a/autopush/tests/test_webpush_server.py +++ b/autopush/tests/test_webpush_server.py @@ -27,7 +27,6 @@ DeleteMessage, MigrateUser, StoreMessages, - Unregister, WebPushMessage, ) import autopush.tests @@ -282,40 +281,6 @@ def test_no_migrate(self): assert db.message.tablename == tablename -class TestUnregisterProcessor(BaseSetup): - - def _makeFUT(self): - from autopush.webpush_server import UnregisterCommand - return UnregisterCommand(self.conf, self.db) - - def test_unregister(self): - cmd = self._makeFUT() - chid = str(uuid4()) - result = cmd.process(Unregister( - uaid=uuid4().hex, - channel_id=chid, - message_month=self.db.current_msg_month) - ) - assert result.success - assert self.metrics.increment.called - assert self.metrics.increment.call_args[0][0] == \ - 'ua.command.unregister' - assert self.logs.logged( - lambda e: (e['log_format'] == "Unregister" and - e['channel_id'] == chid) - ) - - def test_unregister_bad_chid(self): - cmd = self._makeFUT() - result = cmd.process(Unregister( - uaid=uuid4().hex, - channel_id="quux", - message_month=self.db.current_msg_month) - ) - assert result.error - assert "Invalid UUID" in result.error_msg - - class TestStoreMessagesProcessor(BaseSetup): def _makeFUT(self): from autopush.webpush_server import StoreMessagesUserCommand diff --git a/autopush/webpush_server.py b/autopush/webpush_server.py index 1a95566b..8d1f072b 100644 --- a/autopush/webpush_server.py +++ b/autopush/webpush_server.py @@ -9,19 +9,16 @@ attrs, attrib, ) -from botocore.exceptions import ClientError from typing import ( # noqa Dict, List, Optional, - Tuple, - Union + Tuple ) from twisted.logger import Logger from autopush.db import ( # noqa DatabaseManager, - hasher, Message, ) @@ -255,18 +252,15 @@ def __init__(self, conf, db): self.check_storage_processor = CheckStorageCommand(conf, db) self.delete_message_processor = DeleteMessageCommand(conf, db) self.migrate_user_proocessor = MigrateUserCommand(conf, db) - self.unregister_process = UnregisterCommand(conf, db) self.store_messages_process = StoreMessagesUserCommand(conf, db) self.deserialize = dict( delete_message=DeleteMessage, migrate_user=MigrateUser, - unregister=Unregister, store_messages=StoreMessages, ) self.command_dict = dict( delete_message=self.delete_message_processor, migrate_user=self.migrate_user_proocessor, - unregister=self.unregister_process, store_messages=self.store_messages_process, ) # type: Dict[str, ProcessorCommand] @@ -409,60 +403,3 @@ def _validate_chid(chid): if chid != str(result): return False, "Bad UUID format, use lower case, dashed format" return True, None - - -@attrs(slots=True) -class Unregister(InputCommand): - channel_id = attrib() # type: str - uaid = attrib(convert=uaid_from_str) # type: Optional[UUID] - message_month = attrib() # type: str - code = attrib(default=None) # type: int - - -@attrs(slots=True) -class UnregisterResponse(OutputCommand): - success = attrib(default=True) # type: bool - - -@attrs(slots=True) -class UnregisterErrorResponse(OutputCommand): - error_msg = attrib() # type: str - error = attrib(default=True) # type: bool - status = attrib(default=401) # type: int - - -class UnregisterCommand(ProcessorCommand): - - def process(self, - command # type: Unregister - ): - # type: (...) -> Union[UnregisterResponse, UnregisterErrorResponse] - valid, msg = _validate_chid(command.channel_id) - if not valid: - return UnregisterErrorResponse(error_msg=msg) - - message = Message(command.message_month, - boto_resource=self.db.resource) - try: - message.unregister_channel(command.uaid.hex, command.channel_id) - except ClientError as ex: # pragma: nocover - # Since this operates in a separate thread than the tests, - # we can't mock out the unregister_channel call inside - # test_webpush_server, thus the # nocover. - log.error("Unregister failed", - channel_id=command.channel_id, - uaid_hash=hasher(command.uaid.hex), - exeption=ex) - return UnregisterErrorResponse(error_msg="Unregister failed") - - # TODO: Clear out any existing tracked messages for this channel - - self.metrics.increment('ua.command.unregister') - # TODO: user/raw_agent? - log.info( - "Unregister", - channel_id=command.channel_id, - uaid_hash=hasher(command.uaid.hex), - **dict(code=command.code) if command.code else {} - ) - return UnregisterResponse() diff --git a/autopush_rs/src/call.rs b/autopush_rs/src/call.rs index 088b82db..3cdc9c91 100644 --- a/autopush_rs/src/call.rs +++ b/autopush_rs/src/call.rs @@ -116,13 +116,6 @@ impl FnBox for F { #[derive(Serialize)] #[serde(tag = "command", rename_all = "snake_case")] enum Call { - Unregister { - uaid: String, - channel_id: String, - message_month: String, - code: i32, - }, - DeleteMessage { message: protocol::Notification, message_month: String, @@ -145,20 +138,6 @@ struct PythonError { pub error_msg: String, } -#[derive(Deserialize)] -#[serde(untagged)] -pub enum UnRegisterResponse { - Success { - success: bool, - }, - - Error { - error_msg: String, - error: bool, - status: u32, - }, -} - #[derive(Deserialize)] pub struct DeleteMessageResponse { pub success: bool, @@ -175,23 +154,6 @@ pub struct StoreMessagesResponse { } impl Server { - pub fn unregister( - &self, - uaid: String, - message_month: String, - channel_id: String, - code: i32, - ) -> MyFuture { - let (call, fut) = PythonCall::new(&Call::Unregister { - uaid: uaid, - message_month: message_month, - channel_id: channel_id, - code: code, - }); - self.send_to_python(call); - return fut; - } - pub fn delete_message( &self, message_month: String, diff --git a/autopush_rs/src/client.rs b/autopush_rs/src/client.rs index a223839e..b366ee31 100644 --- a/autopush_rs/src/client.rs +++ b/autopush_rs/src/client.rs @@ -25,9 +25,9 @@ use call; use errors::*; use protocol::{ClientMessage, Notification, ServerMessage, ServerNotification}; use server::Server; -use util::{ms_since_epoch, parse_user_agent, sec_since_epoch}; use util::ddb_helpers::{CheckStorageResponse, HelloResponse, RegisterResponse}; use util::megaphone::{ClientServices, Service, ServiceClientInit}; +use util::{ms_since_epoch, parse_user_agent, sec_since_epoch}; // Created and handed to the AutopushServer pub struct RegisteredClient { @@ -606,7 +606,7 @@ where #[state_machine_future(transitions(SendThenWait))] AwaitUnregister { channel_id: Uuid, - response: MyFuture, + response: MyFuture, data: AuthClientData, }, @@ -734,7 +734,6 @@ where } } Either::A(ClientMessage::Register { channel_id, key }) => { - data.srv.metrics.incr("ua.command.register").ok(); debug!("Got a register command"; "channel_id" => channel_id.hyphenated().to_string()); let uaid = webpush.uaid.clone(); @@ -753,12 +752,12 @@ where debug!("Got a unregister command"); let uaid = webpush.uaid.clone(); let message_month = webpush.message_month.clone(); - let channel_id_str = channel_id.hyphenated().to_string(); - let fut = data.srv.unregister( - uaid.simple().to_string(), - message_month, - channel_id_str, + let fut = data.srv.ddb.unregister( + &uaid, + &channel_id, + &message_month, code.unwrap_or(200), + &data.srv.metrics, ); transition!(AwaitUnregister { channel_id, @@ -961,6 +960,12 @@ where let msg = match try_ready!(await_register.response.poll()) { RegisterResponse::Success { endpoint } => { let mut webpush = await_register.data.webpush.borrow_mut(); + await_register + .data + .srv + .metrics + .incr("ua.command.register") + .ok(); webpush.stats.registers += 1; ServerMessage::Register { channel_id: await_register.channel_id, @@ -989,24 +994,19 @@ where await_unregister: &'a mut RentToOwn<'a, AwaitUnregister>, ) -> Poll, Error> { debug!("State: AwaitUnRegister"); - let msg = match try_ready!(await_unregister.response.poll()) { - call::UnRegisterResponse::Success { success } => { - debug!("Got the unregister response"); - let mut webpush = await_unregister.data.webpush.borrow_mut(); - webpush.stats.unregisters += 1; - ServerMessage::Unregister { - channel_id: await_unregister.channel_id, - status: if success { 200 } else { 500 }, - } + let msg = if try_ready!(await_unregister.response.poll()) { + debug!("Got the unregister response"); + let mut webpush = await_unregister.data.webpush.borrow_mut(); + webpush.stats.unregisters += 1; + ServerMessage::Unregister { + channel_id: await_unregister.channel_id, + status: 200, } - call::UnRegisterResponse::Error { - error_msg, status, .. - } => { - debug!("Got unregister fail, error: {}", error_msg); - ServerMessage::Unregister { - channel_id: await_unregister.channel_id, - status, - } + } else { + debug!("Got unregister fail"); + ServerMessage::Unregister { + channel_id: await_unregister.channel_id, + status: 500, } }; diff --git a/autopush_rs/src/protocol.rs b/autopush_rs/src/protocol.rs index 0563e773..5df9050c 100644 --- a/autopush_rs/src/protocol.rs +++ b/autopush_rs/src/protocol.rs @@ -45,7 +45,7 @@ pub enum ClientMessage { Unregister { #[serde(rename = "channelID")] channel_id: Uuid, - code: Option, + code: Option, }, BroadcastSubscribe { diff --git a/autopush_rs/src/util/ddb_helpers.rs b/autopush_rs/src/util/ddb_helpers.rs index ac4f1d14..a582792a 100644 --- a/autopush_rs/src/util/ddb_helpers.rs +++ b/autopush_rs/src/util/ddb_helpers.rs @@ -1,6 +1,6 @@ /// DynamoDB Client helpers -use std::env; use std::collections::{HashMap, HashSet}; +use std::env; use std::rc::Rc; use std::result::Result as StdResult; use uuid::Uuid; @@ -23,9 +23,9 @@ use rusoto_dynamodb::{AttributeValue, DeleteItemError, DeleteItemInput, DeleteIt use serde::Serializer; use serde_dynamodb; +use errors::*; use protocol::Notification; use server::Server; -use errors::*; use util::timing::{ms_since_epoch, sec_since_epoch}; const MAX_EXPIRY: u64 = 2592000; @@ -67,37 +67,27 @@ macro_rules! key_schema { } macro_rules! val { - (B => $val:expr) => ( - { - let mut attr = AttributeValue::default(); - attr.b = Some($val); - attr - } - ); - (S => $val:expr) => ( - { - let mut attr = AttributeValue::default(); - attr.s = Some($val.to_string()); - attr - } - ); - (SS => $val:expr) => ( - { - let mut attr = AttributeValue::default(); - let vals: Vec = $val.iter() - .map(|v| v.to_string()) - .collect(); - attr.ss = Some(vals); - attr - } - ); - (N => $val:expr) => ( - { - let mut attr = AttributeValue::default(); - attr.n = Some($val.to_string()); - attr - } - ); + (B => $val:expr) => {{ + let mut attr = AttributeValue::default(); + attr.b = Some($val); + attr + }}; + (S => $val:expr) => {{ + let mut attr = AttributeValue::default(); + attr.s = Some($val.to_string()); + attr + }}; + (SS => $val:expr) => {{ + let mut attr = AttributeValue::default(); + let vals: Vec = $val.iter().map(|v| v.to_string()).collect(); + attr.ss = Some(vals); + attr + }}; + (N => $val:expr) => {{ + let mut attr = AttributeValue::default(); + attr.n = Some($val.to_string()); + attr + }}; } /// Create a **HashMap** from a list of key-value pairs @@ -302,11 +292,8 @@ struct RangeKey { fn parse_sort_key(key: &str) -> Result { lazy_static! { - static ref RE: RegexSet = RegexSet::new(&[ - r"^01:\S+:\S+$", - r"^02:\d+:\S+$", - r"^\S{3,}:\S+$", - ]).unwrap(); + static ref RE: RegexSet = + RegexSet::new(&[r"^01:\S+:\S+$", r"^02:\d+:\S+$", r"^\S{3,}:\S+$",]).unwrap(); } if !RE.is_match(key) { return Err("Invalid chidmessageid".into()).into(); @@ -678,6 +665,35 @@ impl DynamoStorage { Box::new(ddb_response) } + fn unregister_channel_id( + ddb: Rc>, + uaid: &Uuid, + channel_id: &Uuid, + message_table_name: &str, + ) -> MyFuture { + let chid = channel_id.hyphenated().to_string(); + let attr_values = hashmap! { + ":channel_id".to_string() => val!(SS => vec![chid]), + }; + let update_item = UpdateItemInput { + key: ddb_item! { + uaid: s => uaid.simple().to_string(), + chidmessageid: s => " ".to_string() + }, + update_expression: Some("DELETE chids :channel_id".to_string()), + expression_attribute_values: Some(attr_values), + table_name: message_table_name.to_string(), + ..Default::default() + }; + let ddb_response = retry_if( + move || ddb.update_item(&update_item), + |err: &UpdateItemError| { + matches!(err, &UpdateItemError::ProvisionedThroughputExceeded(_)) + }, + ).chain_err(|| "Error unregistering channel"); + Box::new(ddb_response) + } + fn lookup_user( ddb: Rc>, uaid: &Uuid, @@ -869,6 +885,26 @@ impl DynamoStorage { Box::new(response) } + pub fn unregister( + &self, + uaid: &Uuid, + channel_id: &Uuid, + message_month: &str, + code: u32, + metrics: &StatsdClient, + ) -> MyFuture { + let ddb = self.ddb.clone(); + let response = DynamoStorage::unregister_channel_id(ddb, uaid, channel_id, message_month) + .and_then(move |_| -> MyFuture<_> { Box::new(future::ok(true)) }) + .or_else(move |_| -> MyFuture<_> { Box::new(future::ok(false)) }); + metrics + .incr_with_tags("ua.command.unregister") + .with_tag("code", &code.to_string()) + .send() + .ok(); + Box::new(response) + } + pub fn check_storage( &self, table_name: &str, @@ -932,10 +968,10 @@ impl DynamoStorage { #[cfg(test)] mod tests { + use super::parse_sort_key; use chrono::prelude::*; - use uuid::Uuid; use util::us_since_epoch; - use super::parse_sort_key; + use uuid::Uuid; #[test] fn test_parse_sort_key_ver1() {