Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
feat: port hello command to Rust
Browse files Browse the repository at this point in the history
Co-authored by: Phil Jenvey <pjenvey@underboss.org>

Closes #1188
  • Loading branch information
bbangert committed May 9, 2018
1 parent 49ab486 commit 1b48a47
Show file tree
Hide file tree
Showing 16 changed files with 441 additions and 384 deletions.
1 change: 0 additions & 1 deletion autopush/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
Tuple,
Union,
)
from twisted.internet.defer import Deferred # noqa
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet.threads import deferToThread

Expand Down
26 changes: 0 additions & 26 deletions autopush/tests/test_rs_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import uuid
from contextlib import contextmanager
from http.server import BaseHTTPRequestHandler, HTTPServer
from httplib import HTTPResponse # noqa
from mock import Mock, call, patch
from threading import Thread, Event
from unittest.case import SkipTest
Expand All @@ -26,12 +25,10 @@
import requests
import twisted.internet.base
from cryptography.fernet import Fernet
from typing import Optional # noqa
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.trial import unittest
from twisted.logger import globalLogPublisher

import autopush.db as db
import autopush.tests
from autopush.config import AutopushConfig
from autopush.logging import begin_or_register
Expand Down Expand Up @@ -224,28 +221,6 @@ def test_no_rotation(self):
self.start_conn(self._conn_conf)
yield self.shut_down(client)

@inlineCallbacks
def test_hello_only_has_three_calls(self):
db.TRACK_DB_CALLS = True
client = Client(self._ws_url)
yield client.connect()
result = yield client.hello()
assert result != {}
assert result["use_webpush"] is True

# Disconnect and reconnect to trigger storage check
yield client.disconnect()
yield client.connect()
result = yield client.hello()
assert result != {}
assert result["use_webpush"] is True
yield client.wait_for(lambda: len(db.DB_CALLS) == 2)
assert db.DB_CALLS == ['register_user', 'register_user']
db.DB_CALLS = []
db.TRACK_DB_CALLS = False

yield self.shut_down(client)

@inlineCallbacks
def test_hello_echo(self):
client = Client(self._ws_url)
Expand Down Expand Up @@ -684,7 +659,6 @@ def test_message_with_topic(self):
client = yield self.quick_register()
yield client.send_notification(data=data, topic="topicname")
self.conn.db.metrics.increment.assert_has_calls([
call('ua.command.hello'),
call('ua.command.register'),
# We can't see Rust metric calls
# call('ua.notification.topic')
Expand Down
67 changes: 0 additions & 67 deletions autopush/tests/test_webpush_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
CheckStorage,
DeleteMessage,
DropUser,
Hello,
HelloResponse,
MigrateUser,
Register,
StoreMessages,
Expand Down Expand Up @@ -117,14 +115,6 @@ class Meta:
version = factory.LazyAttribute(generate_version)


class HelloFactory(factory.Factory):
class Meta:
model = Hello

uaid = factory.LazyFunction(lambda: uuid4().hex)
connected_at = factory.LazyFunction(lambda: int(time.time() * 1000))


class CheckStorageFactory(factory.Factory):
class Meta:
model = CheckStorage
Expand Down Expand Up @@ -211,63 +201,6 @@ def test_start_stop(self):
finally:
ws.stop()

def test_hello_process(self):
ws = self._makeFUT()
ws.start()
try:
hello = HelloFactory()
result = ws.command_processor.process_message(dict(
command="hello",
uaid=hello.uaid.hex,
connected_at=hello.connected_at,
))
assert "error" not in result
assert hello.uaid.hex != result["uaid"]
finally:
ws.stop()


class TestHelloProcessor(BaseSetup):
def _makeFUT(self):
from autopush.webpush_server import HelloCommand
return HelloCommand(self.conf, self.db)

def test_nonexisting_uaid(self):
p = self._makeFUT()
hello = HelloFactory()
result = p.process(hello) # type: HelloResponse
assert isinstance(result, HelloResponse)
assert hello.uaid != result.uaid
assert result.check_storage is False
assert result.connected_at == hello.connected_at
assert self.metrics.increment.called
assert self.metrics.increment.call_args[0][0] == 'ua.command.hello'

def test_existing_uaid(self):
p = self._makeFUT()
hello = HelloFactory()
success, _ = self.db.router.register_user(UserItemFactory(
uaid=hello.uaid.hex))
assert success is True
result = p.process(hello) # type: HelloResponse
assert isinstance(result, HelloResponse)
assert hello.uaid.hex == result.uaid
assert result.check_storage is True
assert result.connected_at == hello.connected_at
assert self.metrics.increment.called
assert self.metrics.increment.call_args[0][0] == 'ua.command.hello'

def test_existing_newer_uaid(self):
p = self._makeFUT()
hello = HelloFactory()
self.db.router.register_user(
UserItemFactory(uaid=hello.uaid.hex,
connected_at=hello.connected_at+10)
)
result = p.process(hello) # type: HelloResponse
assert isinstance(result, HelloResponse)
assert result.uaid is None


class TestDeleteMessageProcessor(BaseSetup):
def _makeFUT(self):
Expand Down
3 changes: 1 addition & 2 deletions autopush/web/registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
from typing import ( # noqa
Optional,
Set,
Tuple,
Dict
Tuple
)

import simplejson as json
Expand Down
133 changes: 3 additions & 130 deletions autopush/webpush_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"""
from threading import Thread
from uuid import UUID, uuid4
from uuid import UUID

import attr
from attr import (
Expand All @@ -21,20 +21,16 @@

from autopush.db import ( # noqa
DatabaseManager,
has_connected_this_month,
hasher,
generate_last_connect,
Message,
)

from autopush.config import AutopushConfig # noqa
from autopush.exceptions import ItemNotFound
from autopush.metrics import IMetrics # noqa
from autopush.web.webpush import MAX_TTL
from autopush.types import JSONDict # noqa
from autopush.utils import WebPushNotification
from autopush.websocket import USER_RECORD_VERSION
from autopush_rs import AutopushCall, AutopushServer, AutopushQueue # noqa
from autopush_rs import AutopushServer, AutopushQueue # noqa

log = Logger()

Expand Down Expand Up @@ -129,12 +125,6 @@ class InputCommand(object):
pass


@attrs(slots=True)
class Hello(InputCommand):
connected_at = attrib() # type: int
uaid = attrib(default=None, convert=uaid_from_str) # type: Optional[UUID]


@attrs(slots=True)
class CheckStorage(InputCommand):
uaid = attrib(convert=uaid_from_str) # type: UUID
Expand Down Expand Up @@ -176,16 +166,6 @@ class OutputCommand(object):
pass


@attrs(slots=True)
class HelloResponse(OutputCommand):
uaid = attrib() # type: Optional[str]
message_month = attrib() # type: str
check_storage = attrib() # type: bool
reset_uaid = attrib() # type: bool
connected_at = attrib() # type: int
rotate_message_table = attrib(default=False) # type: bool


@attrs(slots=True)
class CheckStorageResponse(OutputCommand):
include_topic = attrib() # type: bool
Expand Down Expand Up @@ -228,7 +208,7 @@ def __init__(self, conf, db, num_threads=10):
self.incoming = AutopushQueue()
self.workers = [] # type: List[Thread]
self.command_processor = CommandProcessor(conf, self.db)
self.rust = AutopushServer(conf, self.incoming)
self.rust = AutopushServer(conf, db.message_tables, self.incoming)
self.running = False

def start(self):
Expand Down Expand Up @@ -282,7 +262,6 @@ def __init__(self, conf, db):
# type: (AutopushConfig, DatabaseManager) -> None
self.conf = conf
self.db = db
self.hello_processor = HelloCommand(conf, db)
self.check_storage_processor = CheckStorageCommand(conf, db)
self.delete_message_processor = DeleteMessageCommand(conf, db)
self.drop_user_processor = DropUserCommand(conf, db)
Expand All @@ -291,7 +270,6 @@ def __init__(self, conf, db):
self.unregister_process = UnregisterCommand(conf, db)
self.store_messages_process = StoreMessagesUserCommand(conf, db)
self.deserialize = dict(
hello=Hello,
delete_message=DeleteMessage,
drop_user=DropUser,
migrate_user=MigrateUser,
Expand All @@ -300,7 +278,6 @@ def __init__(self, conf, db):
store_messages=StoreMessages,
)
self.command_dict = dict(
hello=self.hello_processor,
delete_message=self.delete_message_processor,
drop_user=self.drop_user_processor,
migrate_user=self.migrate_user_proocessor,
Expand Down Expand Up @@ -347,110 +324,6 @@ def process(self, command):
raise NotImplementedError()


class HelloCommand(ProcessorCommand):
def process(self, hello):
# type: (Hello) -> HelloResponse
user_item = None
flags = dict(
check_storage=False,
message_month=self.db.current_msg_month,
reset_uaid=False
)
if hello.uaid:
user_item, new_flags = self.lookup_user(hello)
if user_item:
# Only swap for the new flags if the user exists
flags = new_flags

if not user_item:
user_item = self.create_user(hello)

# Save the UAID as register_user removes it
uaid = user_item["uaid"] # type: str
success, _ = self.db.router.register_user(user_item)
flags["connected_at"] = hello.connected_at
if not success:
# User has already connected more recently elsewhere
return HelloResponse(uaid=None, **flags)

self.metrics.increment('ua.command.hello')
return HelloResponse(uaid=uaid, **flags)

def lookup_user(self, hello):
# type: (Hello) -> (Optional[JSONDict], JSONDict)
flags = dict(
message_month=None,
check_storage=False,
reset_uaid=False,
rotate_message_table=False,
)
uaid = hello.uaid.hex
try:
record = self.db.router.get_uaid(uaid)
except ItemNotFound:
return None, flags

# All records must have a router_type and connected_at, in some odd
# cases a record exists for some users without it
if "router_type" not in record or "connected_at" not in record:
self.drop_user(uaid, record, 104)
return None, flags

# Current month must exist and be a valid prior month
if ("current_month" not in record) or record["current_month"] \
not in self.db.message_tables:
self.drop_user(uaid, record, 105)
return None, flags

# If we got here, its a valid user that needs storage checked
flags["check_storage"] = True

# Determine if message table rotation is needed
flags["message_month"] = record["current_month"]
if record["current_month"] != self.db.current_msg_month:
flags["rotate_message_table"] = True

# Include and update last_connect if needed, otherwise exclude
if has_connected_this_month(record):
del record["last_connect"]
else:
record["last_connect"] = generate_last_connect()

# Determine if this is missing a record version
if ("record_version" not in record or
int(record["record_version"]) < USER_RECORD_VERSION):
flags["reset_uaid"] = True

# Update the node_id, connected_at for this node/connected_at
record["node_id"] = self.conf.router_url
record["connected_at"] = hello.connected_at
return record, flags

def create_user(self, hello):
# type: (Hello) -> JSONDict
return dict(
uaid=uuid4().hex,
node_id=self.conf.router_url,
connected_at=hello.connected_at,
router_type="webpush",
last_connect=generate_last_connect(),
record_version=USER_RECORD_VERSION,
current_month=self.db.current_msg_month,
)

def drop_user(self, uaid, uaid_record, code):
# type: (str, dict, int) -> None
"""Drop a user record"""
log.debug(
"Dropping User",
code=code,
uaid_hash=hasher(uaid),
uaid_record=repr(uaid_record)
)
self.metrics.increment('ua.expiration', tags=['code:{}'.format(code)])
self.db.router.drop_user(uaid)


class CheckStorageCommand(ProcessorCommand):
def process(self, command):
# type: (CheckStorage) -> CheckStorageResponse
Expand Down
1 change: 0 additions & 1 deletion autopush/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@
from autopush.protocol import IgnoreBody
from autopush.metrics import IMetrics, make_tags # noqa
from autopush.ssl import AutopushSSLContextFactory # noqa
from autopush.types import JSONDict # noqa
from autopush.utils import (
parse_user_agent,
validate_uaid,
Expand Down
1 change: 1 addition & 0 deletions autopush_rs/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions autopush_rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ libc = "0.2.40"
log = { version = "0.4.1", features = ["max_level_trace", "release_max_level_warn"] }
matches = "0.1.6"
openssl = "0.10.5"
rand = "0.4"
regex = "0.2"
reqwest = { version = "0.8.5", features = ["unstable"] }
rusoto_core = "0.32.0"
Expand Down
Loading

0 comments on commit 1b48a47

Please sign in to comment.