Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
feat: port hello command to Rust
Browse files Browse the repository at this point in the history
Co-authored by: Phil Jenvey <pjenvey@underboss.org>

Closes #1188
  • Loading branch information
bbangert committed May 9, 2018
1 parent 49ab486 commit 7648ed6
Show file tree
Hide file tree
Showing 14 changed files with 411 additions and 317 deletions.
3 changes: 3 additions & 0 deletions autopush/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ class AutopushConfig(object):

allow_table_rotation = attrib(default=True) # type: bool

router_table_name = attrib(default=None) # type: Optional[str]
message_table_names = attrib(default=None) # type: Optional[str]

def __attrs_post_init__(self):
"""Initialize the Settings object"""
# Setup hosts/ports/urls
Expand Down
23 changes: 0 additions & 23 deletions autopush/tests/test_rs_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,28 +224,6 @@ def test_no_rotation(self):
self.start_conn(self._conn_conf)
yield self.shut_down(client)

@inlineCallbacks
def test_hello_only_has_three_calls(self):
db.TRACK_DB_CALLS = True
client = Client(self._ws_url)
yield client.connect()
result = yield client.hello()
assert result != {}
assert result["use_webpush"] is True

# Disconnect and reconnect to trigger storage check
yield client.disconnect()
yield client.connect()
result = yield client.hello()
assert result != {}
assert result["use_webpush"] is True
yield client.wait_for(lambda: len(db.DB_CALLS) == 2)
assert db.DB_CALLS == ['register_user', 'register_user']
db.DB_CALLS = []
db.TRACK_DB_CALLS = False

yield self.shut_down(client)

@inlineCallbacks
def test_hello_echo(self):
client = Client(self._ws_url)
Expand Down Expand Up @@ -684,7 +662,6 @@ def test_message_with_topic(self):
client = yield self.quick_register()
yield client.send_notification(data=data, topic="topicname")
self.conn.db.metrics.increment.assert_has_calls([
call('ua.command.hello'),
call('ua.command.register'),
# We can't see Rust metric calls
# call('ua.notification.topic')
Expand Down
67 changes: 0 additions & 67 deletions autopush/tests/test_webpush_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
CheckStorage,
DeleteMessage,
DropUser,
Hello,
HelloResponse,
MigrateUser,
Register,
StoreMessages,
Expand Down Expand Up @@ -117,14 +115,6 @@ class Meta:
version = factory.LazyAttribute(generate_version)


class HelloFactory(factory.Factory):
class Meta:
model = Hello

uaid = factory.LazyFunction(lambda: uuid4().hex)
connected_at = factory.LazyFunction(lambda: int(time.time() * 1000))


class CheckStorageFactory(factory.Factory):
class Meta:
model = CheckStorage
Expand Down Expand Up @@ -211,63 +201,6 @@ def test_start_stop(self):
finally:
ws.stop()

def test_hello_process(self):
ws = self._makeFUT()
ws.start()
try:
hello = HelloFactory()
result = ws.command_processor.process_message(dict(
command="hello",
uaid=hello.uaid.hex,
connected_at=hello.connected_at,
))
assert "error" not in result
assert hello.uaid.hex != result["uaid"]
finally:
ws.stop()


class TestHelloProcessor(BaseSetup):
def _makeFUT(self):
from autopush.webpush_server import HelloCommand
return HelloCommand(self.conf, self.db)

def test_nonexisting_uaid(self):
p = self._makeFUT()
hello = HelloFactory()
result = p.process(hello) # type: HelloResponse
assert isinstance(result, HelloResponse)
assert hello.uaid != result.uaid
assert result.check_storage is False
assert result.connected_at == hello.connected_at
assert self.metrics.increment.called
assert self.metrics.increment.call_args[0][0] == 'ua.command.hello'

def test_existing_uaid(self):
p = self._makeFUT()
hello = HelloFactory()
success, _ = self.db.router.register_user(UserItemFactory(
uaid=hello.uaid.hex))
assert success is True
result = p.process(hello) # type: HelloResponse
assert isinstance(result, HelloResponse)
assert hello.uaid.hex == result.uaid
assert result.check_storage is True
assert result.connected_at == hello.connected_at
assert self.metrics.increment.called
assert self.metrics.increment.call_args[0][0] == 'ua.command.hello'

def test_existing_newer_uaid(self):
p = self._makeFUT()
hello = HelloFactory()
self.db.router.register_user(
UserItemFactory(uaid=hello.uaid.hex,
connected_at=hello.connected_at+10)
)
result = p.process(hello) # type: HelloResponse
assert isinstance(result, HelloResponse)
assert result.uaid is None


class TestDeleteMessageProcessor(BaseSetup):
def _makeFUT(self):
Expand Down
125 changes: 1 addition & 124 deletions autopush/webpush_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,6 @@ class InputCommand(object):
pass


@attrs(slots=True)
class Hello(InputCommand):
connected_at = attrib() # type: int
uaid = attrib(default=None, convert=uaid_from_str) # type: Optional[UUID]


@attrs(slots=True)
class CheckStorage(InputCommand):
uaid = attrib(convert=uaid_from_str) # type: UUID
Expand Down Expand Up @@ -176,16 +170,6 @@ class OutputCommand(object):
pass


@attrs(slots=True)
class HelloResponse(OutputCommand):
uaid = attrib() # type: Optional[str]
message_month = attrib() # type: str
check_storage = attrib() # type: bool
reset_uaid = attrib() # type: bool
connected_at = attrib() # type: int
rotate_message_table = attrib(default=False) # type: bool


@attrs(slots=True)
class CheckStorageResponse(OutputCommand):
include_topic = attrib() # type: bool
Expand Down Expand Up @@ -228,7 +212,7 @@ def __init__(self, conf, db, num_threads=10):
self.incoming = AutopushQueue()
self.workers = [] # type: List[Thread]
self.command_processor = CommandProcessor(conf, self.db)
self.rust = AutopushServer(conf, self.incoming)
self.rust = AutopushServer(conf, db.message_tables, self.incoming)
self.running = False

def start(self):
Expand Down Expand Up @@ -282,7 +266,6 @@ def __init__(self, conf, db):
# type: (AutopushConfig, DatabaseManager) -> None
self.conf = conf
self.db = db
self.hello_processor = HelloCommand(conf, db)
self.check_storage_processor = CheckStorageCommand(conf, db)
self.delete_message_processor = DeleteMessageCommand(conf, db)
self.drop_user_processor = DropUserCommand(conf, db)
Expand All @@ -291,7 +274,6 @@ def __init__(self, conf, db):
self.unregister_process = UnregisterCommand(conf, db)
self.store_messages_process = StoreMessagesUserCommand(conf, db)
self.deserialize = dict(
hello=Hello,
delete_message=DeleteMessage,
drop_user=DropUser,
migrate_user=MigrateUser,
Expand All @@ -300,7 +282,6 @@ def __init__(self, conf, db):
store_messages=StoreMessages,
)
self.command_dict = dict(
hello=self.hello_processor,
delete_message=self.delete_message_processor,
drop_user=self.drop_user_processor,
migrate_user=self.migrate_user_proocessor,
Expand Down Expand Up @@ -347,110 +328,6 @@ def process(self, command):
raise NotImplementedError()


class HelloCommand(ProcessorCommand):
def process(self, hello):
# type: (Hello) -> HelloResponse
user_item = None
flags = dict(
check_storage=False,
message_month=self.db.current_msg_month,
reset_uaid=False
)
if hello.uaid:
user_item, new_flags = self.lookup_user(hello)
if user_item:
# Only swap for the new flags if the user exists
flags = new_flags

if not user_item:
user_item = self.create_user(hello)

# Save the UAID as register_user removes it
uaid = user_item["uaid"] # type: str
success, _ = self.db.router.register_user(user_item)
flags["connected_at"] = hello.connected_at
if not success:
# User has already connected more recently elsewhere
return HelloResponse(uaid=None, **flags)

self.metrics.increment('ua.command.hello')
return HelloResponse(uaid=uaid, **flags)

def lookup_user(self, hello):
# type: (Hello) -> (Optional[JSONDict], JSONDict)
flags = dict(
message_month=None,
check_storage=False,
reset_uaid=False,
rotate_message_table=False,
)
uaid = hello.uaid.hex
try:
record = self.db.router.get_uaid(uaid)
except ItemNotFound:
return None, flags

# All records must have a router_type and connected_at, in some odd
# cases a record exists for some users without it
if "router_type" not in record or "connected_at" not in record:
self.drop_user(uaid, record, 104)
return None, flags

# Current month must exist and be a valid prior month
if ("current_month" not in record) or record["current_month"] \
not in self.db.message_tables:
self.drop_user(uaid, record, 105)
return None, flags

# If we got here, its a valid user that needs storage checked
flags["check_storage"] = True

# Determine if message table rotation is needed
flags["message_month"] = record["current_month"]
if record["current_month"] != self.db.current_msg_month:
flags["rotate_message_table"] = True

# Include and update last_connect if needed, otherwise exclude
if has_connected_this_month(record):
del record["last_connect"]
else:
record["last_connect"] = generate_last_connect()

# Determine if this is missing a record version
if ("record_version" not in record or
int(record["record_version"]) < USER_RECORD_VERSION):
flags["reset_uaid"] = True

# Update the node_id, connected_at for this node/connected_at
record["node_id"] = self.conf.router_url
record["connected_at"] = hello.connected_at
return record, flags

def create_user(self, hello):
# type: (Hello) -> JSONDict
return dict(
uaid=uuid4().hex,
node_id=self.conf.router_url,
connected_at=hello.connected_at,
router_type="webpush",
last_connect=generate_last_connect(),
record_version=USER_RECORD_VERSION,
current_month=self.db.current_msg_month,
)

def drop_user(self, uaid, uaid_record, code):
# type: (str, dict, int) -> None
"""Drop a user record"""
log.debug(
"Dropping User",
code=code,
uaid_hash=hasher(uaid),
uaid_record=repr(uaid_record)
)
self.metrics.increment('ua.expiration', tags=['code:{}'.format(code)])
self.db.router.drop_user(uaid)


class CheckStorageCommand(ProcessorCommand):
def process(self, command):
# type: (CheckStorage) -> CheckStorageResponse
Expand Down
1 change: 1 addition & 0 deletions autopush_rs/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions autopush_rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ libc = "0.2.40"
log = { version = "0.4.1", features = ["max_level_trace", "release_max_level_warn"] }
matches = "0.1.6"
openssl = "0.10.5"
rand = "0.4"
regex = "0.2"
reqwest = { version = "0.8.5", features = ["unstable"] }
rusoto_core = "0.32.0"
Expand Down
9 changes: 7 additions & 2 deletions autopush_rs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ def free(obj, free_fn):


class AutopushServer(object):
def __init__(self, conf, queue):
# type: (AutopushConfig, AutopushQueue) -> AutopushServer
def __init__(self, conf, message_tables, queue):
# type: (AutopushConfig, List[str], AutopushQueue) -> AutopushServer
cfg = ffi.new('AutopushServerOptions*')
cfg.auto_ping_interval = conf.auto_ping_interval
cfg.auto_ping_timeout = conf.auto_ping_timeout
Expand All @@ -29,12 +29,17 @@ def __init__(self, conf, queue):
cfg.open_handshake_timeout = 5
cfg.port = conf.port
cfg.router_port = conf.router_port
cfg.router_url = ffi_from_buffer(conf.router_url)
cfg.ssl_cert = ffi_from_buffer(conf.ssl.cert)
cfg.ssl_dh_param = ffi_from_buffer(conf.ssl.dh_param)
cfg.ssl_key = ffi_from_buffer(conf.ssl.key)
cfg.json_logging = not conf.human_logs
cfg.statsd_host = ffi_from_buffer(conf.statsd_host)
cfg.statsd_port = conf.statsd_port
cfg.router_table_name = ffi_from_buffer(conf.router_table.tablename)
# XXX: keepalive
self.message_table_names = ','.join(name.encode('utf-8') for name in message_tables)
cfg.message_table_names = ffi_from_buffer(self.message_table_names)
cfg.megaphone_api_url = ffi_from_buffer(conf.megaphone_api_url)
cfg.megaphone_api_token = ffi_from_buffer(conf.megaphone_api_token)
cfg.megaphone_poll_interval = conf.megaphone_poll_interval
Expand Down
Loading

0 comments on commit 7648ed6

Please sign in to comment.