Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1159 from mozilla-services/feat/issue-1129.4
Browse files Browse the repository at this point in the history
feat: add megaphone integration tests
  • Loading branch information
bbangert authored Mar 16, 2018
2 parents 9fc21ba + b2132f6 commit 2188fe0
Show file tree
Hide file tree
Showing 9 changed files with 264 additions and 26 deletions.
1 change: 1 addition & 0 deletions autopush/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ class AutopushConfig(object):
convert=_init_crypto_key, default=None) # type: List[str]

bear_hash_key = attrib(default=Factory(list)) # type: List[str]
human_logs = attrib(default=True) # type: bool

hostname = attrib(default=None) # type: Optional[str]
port = attrib(default=None) # type: Optional[int]
Expand Down
3 changes: 2 additions & 1 deletion autopush/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ def setup(self, rotate_tables=True, num_threads=10):
self.push_server = WebPushServer(
self.conf,
self.db,
num_threads=num_threads
num_threads=num_threads,
)

def run(self): # pragma: nocover
Expand Down Expand Up @@ -381,5 +381,6 @@ def from_argparse(cls, ns, resource=None): # pragma: nocover
close_handshake_timeout=ns.close_handshake_timeout,
aws_ddb_endpoint=ns.aws_ddb_endpoint,
megaphone_api_url=ns.megaphone_api_url,
megaphone_poll_interval=ns.megaphone_poll_interval,
resource=resource
)
24 changes: 23 additions & 1 deletion autopush/tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def connect(self):
self.ws = websocket.create_connection(self.url)
return self.ws.connected

def hello(self, uaid=None):
def hello(self, uaid=None, services=None):
if self.channels:
chans = self.channels.keys()
else:
Expand All @@ -114,6 +114,8 @@ def hello(self, uaid=None):
channelIDs=chans)
if uaid or self.uaid:
hello_dict["uaid"] = uaid or self.uaid
if services:
hello_dict["broadcasts"] = services
msg = json.dumps(hello_dict)
log.debug("Send: %s", msg)
self.ws.send(msg)
Expand All @@ -127,6 +129,12 @@ def hello(self, uaid=None):
self.uaid = result["uaid"]
return result

def broadcast_subscribe(self, services):
msg = json.dumps(dict(messageType="broadcast_subscribe",
broadcasts=services))
log.debug("Send: %s", msg)
self.ws.send(msg)

def register(self, chid=None, key=None):
chid = chid or str(uuid.uuid4())
msg = json.dumps(dict(messageType="register",
Expand Down Expand Up @@ -244,6 +252,20 @@ def get_notification(self, timeout=1):
finally:
self.ws.settimeout(orig_timeout)

def get_broadcast(self, timeout=1):
orig_timeout = self.ws.gettimeout()
self.ws.settimeout(timeout)
try:
d = self.ws.recv()
log.debug("Recv: %s", d)
result = json.loads(d)
assert result.get("messageType") == "broadcast"
return result
except Exception: # pragma: nocover
return None
finally:
self.ws.settimeout(orig_timeout)

def ping(self):
log.debug("Send: %s", "{}")
self.ws.send("{}")
Expand Down
201 changes: 200 additions & 1 deletion autopush/tests/test_rs_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,22 @@
last message as production currently uses.
"""
import json
import logging
import os
import re
import socket
import time
import uuid
from contextlib import contextmanager
from http.server import BaseHTTPRequestHandler, HTTPServer
from httplib import HTTPResponse # noqa
from mock import Mock, call
from threading import Thread, Event
from unittest.case import SkipTest

import ecdsa
import requests
import twisted.internet.base
from cryptography.fernet import Fernet
from typing import Optional # noqa
Expand Down Expand Up @@ -45,12 +51,38 @@
MESSAGE_TABLE = os.environ.get("MESSAGE_TABLE", "message_int_test")


def get_free_port():
s = socket.socket(socket.AF_INET, type=socket.SOCK_STREAM)
s.bind(('localhost', 0))
address, port = s.getsockname()
s.close()
return port


def setup_module():
logging.getLogger('boto').setLevel(logging.CRITICAL)
if "SKIP_INTEGRATION" in os.environ: # pragma: nocover
raise SkipTest("Skipping integration tests")


class MockMegaphoneRequestHandler(BaseHTTPRequestHandler):
API_PATTERN = re.compile(r'/v1/broadcasts')
services = {}
polled = Event()

def do_GET(self):
if re.search(self.API_PATTERN, self.path):
self.send_response(requests.codes.ok)
self.send_header('Content-Type', 'application/json; charset=utf-8')
self.end_headers()
response_content = json.dumps(
{"broadcasts": self.services}
)
self.wfile.write(response_content.encode('utf-8'))
self.polled.set()
return


class TestRustWebPush(unittest.TestCase):
connection_port = 9050
endpoint_port = 9060
Expand Down Expand Up @@ -96,6 +128,7 @@ def setUp(self):
auto_ping_timeout=10.0,
close_handshake_timeout=5,
max_connections=5000,
human_logs=False,
**self.conn_kwargs()
)

Expand All @@ -111,7 +144,7 @@ def setUp(self):
# Websocket server
self.conn = conn = RustConnectionApplication(
conn_conf,
resource=autopush.tests.boto_resource
resource=autopush.tests.boto_resource,
)
conn.setup(rotate_tables=False, num_threads=2)
conn.startService()
Expand Down Expand Up @@ -709,3 +742,169 @@ def test_with_key(self):
status=401)

yield self.shut_down(client)


class TestRustWebPushBroadcast(unittest.TestCase):
connection_port = 9050
endpoint_port = 9060
router_port = 9070

_endpoint_defaults = dict(
hostname='localhost',
port=endpoint_port,
endpoint_port=endpoint_port,
endpoint_scheme='http',
router_port=router_port,
statsd_host=None,
router_table=dict(tablename=ROUTER_TABLE),
message_table=dict(tablename=MESSAGE_TABLE),
use_cryptography=True,
)

_conn_defaults = dict(
hostname='localhost',
port=connection_port,
endpoint_port=endpoint_port,
router_port=router_port,
endpoint_scheme='http',
statsd_host=None,
router_table=dict(tablename=ROUTER_TABLE),
message_table=dict(tablename=MESSAGE_TABLE),
use_cryptography=True,
human_logs=False,
)

def setUp(self):
# Megaphone API mock
mock_server_port = get_free_port()
MockMegaphoneRequestHandler.services = {}
MockMegaphoneRequestHandler.polled.clear()
mock_server = HTTPServer(('localhost', mock_server_port),
MockMegaphoneRequestHandler)
mock_server_thread = Thread(target=mock_server.serve_forever)
mock_server_thread.setDaemon(True)
mock_server_thread.start()
self.mock_server_thread = mock_server_thread
self.mock_megaphone = MockMegaphoneRequestHandler

self.logs = TestingLogObserver()
begin_or_register(self.logs)
self.addCleanup(globalLogPublisher.removeObserver, self.logs)

megaphone_api_url = 'http://localhost:{port}/v1/broadcasts'.format(
port=mock_server.server_port)

crypto_key = Fernet.generate_key()
ep_conf = AutopushConfig(
crypto_key=crypto_key,
**self.endpoint_kwargs()
)
conn_conf = AutopushConfig(
crypto_key=crypto_key,
auto_ping_interval=0.5,
auto_ping_timeout=10.0,
close_handshake_timeout=5,
max_connections=5000,
megaphone_api_url=megaphone_api_url,
megaphone_poll_interval=1,
**self.conn_kwargs()
)

# Endpoint HTTP router
self.ep = ep = EndpointApplication(
ep_conf,
resource=autopush.tests.boto_resource
)
ep.setup(rotate_tables=False)
ep.startService()
self.addCleanup(ep.stopService)

# Websocket server
self.conn = conn = RustConnectionApplication(
conn_conf,
resource=autopush.tests.boto_resource
)
conn.setup(rotate_tables=False, num_threads=2)
conn.startService()
self.addCleanup(conn.stopService)

def endpoint_kwargs(self):
return self._endpoint_defaults

def conn_kwargs(self):
return self._conn_defaults

@inlineCallbacks
def shut_down(self, client=None):
if client:
yield client.disconnect()

@property
def _ws_url(self):
return "ws://localhost:{}/".format(self.connection_port)

@inlineCallbacks
def test_broadcast_update_on_connect(self):
self.mock_megaphone.services = {"kinto:123": "ver1"}
self.mock_megaphone.polled.clear()
self.mock_megaphone.polled.wait()

old_ver = {"kinto:123": "ver0"}
client = Client(self._ws_url)
yield client.connect()
result = yield client.hello(services=old_ver)
assert result != {}
assert result["use_webpush"] is True
assert result["broadcasts"]["kinto:123"] == "ver1"

self.mock_megaphone.services = {"kinto:123": "ver2"}
self.mock_megaphone.polled.clear()
self.mock_megaphone.polled.wait()

result = yield client.get_broadcast(2)
assert result["broadcasts"]["kinto:123"] == "ver2"

yield self.shut_down(client)

@inlineCallbacks
def test_broadcast_subscribe(self):
self.mock_megaphone.services = {"kinto:123": "ver1"}
self.mock_megaphone.polled.clear()
self.mock_megaphone.polled.wait()

old_ver = {"kinto:123": "ver0"}
client = Client(self._ws_url)
yield client.connect()
result = yield client.hello()
assert result != {}
assert result["use_webpush"] is True
assert result["broadcasts"] == {}

client.broadcast_subscribe(old_ver)
result = yield client.get_broadcast()
assert result["broadcasts"]["kinto:123"] == "ver1"

self.mock_megaphone.services = {"kinto:123": "ver2"}
self.mock_megaphone.polled.clear()
self.mock_megaphone.polled.wait()

result = yield client.get_broadcast(2)
assert result["broadcasts"]["kinto:123"] == "ver2"

yield self.shut_down(client)

@inlineCallbacks
def test_broadcast_no_changes(self):
self.mock_megaphone.services = {"kinto:123": "ver1"}
self.mock_megaphone.polled.clear()
self.mock_megaphone.polled.wait()

old_ver = {"kinto:123": "ver1"}
client = Client(self._ws_url)
yield client.connect()
result = yield client.hello(services=old_ver)
assert result != {}
assert result["use_webpush"] is True
assert result["broadcasts"] == {}

yield self.shut_down(client)
16 changes: 13 additions & 3 deletions autopush_rs/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion autopush_rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ libc = "0.2.36"
# log: Use this for release builds (leave in for commits)
log = { version = "0.4.1", features = ["max_level_trace", "release_max_level_warn"] }
openssl = "0.10.2"
reqwest = { version = "0.8.4", features = ["unstable"] }
reqwest = { version = "0.8.5", features = ["unstable"] }
sentry = "0.2.0"
serde = "1.0.27"
serde_derive = "1.0.27"
Expand Down
2 changes: 1 addition & 1 deletion autopush_rs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __init__(self, conf, queue):
cfg.ssl_cert = ffi_from_buffer(conf.ssl.cert)
cfg.ssl_dh_param = ffi_from_buffer(conf.ssl.dh_param)
cfg.ssl_key = ffi_from_buffer(conf.ssl.key)
cfg.json_logging = True
cfg.json_logging = conf.human_logs
cfg.statsd_host = ffi_from_buffer(conf.statsd_host)
cfg.statsd_port = conf.statsd_port
cfg.megaphone_api_url = ffi_from_buffer(conf.megaphone_api_url)
Expand Down
Loading

0 comments on commit 2188fe0

Please sign in to comment.