Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
bug: Wrap twisted formatEvent in a try handler
Browse files Browse the repository at this point in the history
Declare the arguments for raven to hopefully prevent key corruption.
NOTE: twisted logging appears to be fairly touchy about parameters. We
fully specify the parameter names to prevent issues with arguments being
used in unexpected ways and generating odd exceptions in handling.
Because errors in error handling just mean higher bar bills.

Closes #460
  • Loading branch information
jrconlin committed May 6, 2016
1 parent e79bcf6 commit 61eb51b
Show file tree
Hide file tree
Showing 10 changed files with 94 additions and 55 deletions.
69 changes: 40 additions & 29 deletions autopush/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,14 +219,16 @@ def _response_err(self, fail):
running.
"""
self.log.failure(fail, status_code=500, errno=999, **self._client_info)
format = fail.value.message or 'Exception'
self.log.failure(format=format, failure=fail,
status_code=500, errno=999, **self._client_info)
self._write_response(500, 999)

def _overload_err(self, fail):
"""errBack for throughput provisioned exceptions"""
fail.trap(ProvisionedThroughputExceededException)
self.log.info("Throughput Exceeded", status_code=503, errno=201,
**self._client_info)
self.log.info(format="Throughput Exceeded", status_code=503,
errno=201, **self._client_info)
self._write_response(503, 201)

def _router_response(self, response):
Expand All @@ -249,15 +251,18 @@ def _router_fail_err(self, fail):
exc = fail.value
if isinstance(exc, RouterException):
if exc.log_exception or exc.status_code >= 500:
self.log.failure(fail, status_code=exc.status_code,
format = fail.value.message or 'Exception'
self.log.failure(format=format,
failure=fail, status_code=exc.status_code,
errno=exc.errno or "",
**self._client_info) # pragma nocover
if 200 <= exc.status_code < 300:
self.log.info("Success", status_code=exc.status_code,
self.log.info(format="Success", status_code=exc.status_code,
logged_status=exc.logged_status or "",
**self._client_info)
elif 400 <= exc.status_code < 500:
self.log.info("Client error", status_code=exc.status_code,
self.log.info(format="Client error",
status_code=exc.status_code,
logged_status=exc.logged_status or "",
errno=exc.errno or "",
**self._client_info)
Expand All @@ -266,21 +271,23 @@ def _router_fail_err(self, fail):
def _uaid_not_found_err(self, fail):
"""errBack for uaid lookup not finding the user"""
fail.trap(ItemNotFound)
self.log.debug("UAID not found in AWS.", status_code=410, errno=103,
self.log.debug(format="UAID not found in AWS.",
status_code=410, errno=103,
**self._client_info)
self._write_response(410, 103)

def _token_err(self, fail):
"""errBack for token decryption fail"""
fail.trap(InvalidToken, InvalidTokenException)
self.log.debug("Invalid token", status_code=400, errno=102,
self.log.debug(format="Invalid token",
status_code=400, errno=102,
**self._client_info)
self._write_response(400, 102)

def _auth_err(self, fail):
"""errBack for invalid auth token"""
fail.trap(VapidAuthException)
self.log.debug("Invalid Auth token",
self.log.debug(format="Invalid Auth token",
status_code=401,
errno=109,
**self._client_info)
Expand All @@ -289,7 +296,7 @@ def _auth_err(self, fail):
def _chid_not_found_err(self, fail):
"""errBack for unknown chid"""
fail.trap(ItemNotFound, ValueError)
self.log.debug("CHID not found in AWS.",
self.log.debug(format="CHID not found in AWS.",
status_code=410, errno=106,
**self._client_info)
self._write_response(410, 106)
Expand Down Expand Up @@ -319,7 +326,7 @@ def _invalid_auth(self, fail):
message = fail.value.message or repr(fail.value)
if isinstance(fail.value, AssertionError):
message = "A decryption error occurred"
self.log.debug("Invalid bearer token: " + repr(message),
self.log.debug(format="Invalid bearer token: " + repr(message),
**self._client_info)
raise VapidAuthException("Invalid bearer token: " + repr(message))

Expand Down Expand Up @@ -393,7 +400,8 @@ def _delete_message(self, kind, uaid, chid):
return d

def _delete_completed(self, response):
self.log.info("Message Deleted", status_code=204, **self._client_info)
self.log.info(format="Message Deleted", status_code=204,
**self._client_info)
self.set_status(204)
self.finish()

Expand Down Expand Up @@ -424,9 +432,9 @@ def put(self, api_ver="v0", token=None):
crypto_key_header = self.request.headers.get('crypto-key')
content_encoding = self.request.headers.get('content-encoding', "")
if content_encoding.lower() == 'aesgcm128' and crypto_key_header:
self.log.debug("Invalid crypto state; aesgcm128 + Crypto-Key",
status_code=400, errno=110,
**self._client_info)
self.log.debug(
format="Invalid crypto state; aesgcm128 + Crypto-Key",
status_code=400, errno=110, **self._client_info)
wpe_url = ("https://developers.google.com/web/updates/2016/03/"
"web-push-encryption")
self._write_response(
Expand Down Expand Up @@ -468,8 +476,9 @@ def _uaid_lookup_results(self, result):
try:
self.router = self.ap_settings.routers[router_key]
except KeyError:
self.log.debug("Invalid router requested", status_code=400,
errno=108, **self._client_info)
self.log.debug(
format="Invalid router requested", status_code=400,
errno=108, **self._client_info)
return self._write_response(400, 108,
message="Invalid router")

Expand All @@ -487,13 +496,13 @@ def _uaid_lookup_results(self, result):
req_fields = ["content-encoding", "encryption"]
if data and not all([x in self.request.headers
for x in req_fields]):
self.log.debug("Client error", status_code=400, errno=101,
**self._client_info)
self.log.debug(format="Client error", status_code=400,
errno=101, **self._client_info)
return self._write_response(400, 101)
if ("encryption-key" in self.request.headers and
"crypto-key" in self.request.headers):
self.log.debug("Client error", status_code=400, errno=110,
**self._client_info)
self.log.debug(format="Client error", status_code=400,
errno=110, **self._client_info)
return self._write_response(
400, 110, message="Invalid crypto headers")
self._client_info["message_size"] = len(data) if data else 0
Expand All @@ -505,12 +514,12 @@ def _uaid_lookup_results(self, result):
# Cap the TTL to our MAX_TTL
ttl = min(ttl, MAX_TTL)
else:
self.log.debug("Client error", status_code=400,
self.log.debug(format="Client error", status_code=400,
errno=112, **self._client_info)
return self._write_response(400, 112, message="Invalid TTL header")

if data and len(data) > self.ap_settings.max_data:
self.log.debug("Client error", status_code=400, errno=104,
self.log.debug(format="Client error", status_code=400, errno=104,
**self._client_info)
return self._write_response(
413, 104, message="Data payload too large")
Expand Down Expand Up @@ -574,9 +583,10 @@ def _router_completed(self, response, uaid_data, warning=""):
return d
else:
if response.status_code == 200 or response.logged_status == 200:
self.log.info("Successful delivery", **self._client_info)
self.log.info(format="Successful delivery",
**self._client_info)
elif response.status_code == 202 or response.logged_status == 202:
self.log.info("Router miss, message stored.",
self.log.info(format="Router miss, message stored.",
**self._client_info)
time_diff = time.time() - self.start_time
self.metrics.timing("updates.handled", duration=time_diff)
Expand Down Expand Up @@ -631,7 +641,7 @@ def post(self, router_type="", router_token="", uaid="", chid=""):

# normalize the path vars into parameters
if router_type not in self.ap_settings.routers:
self.log.debug("Invalid router requested",
self.log.debug(format="Invalid router requested",
status_code=400, errno=108,
**self._client_info)
return self._write_response(
Expand Down Expand Up @@ -687,7 +697,7 @@ def put(self, router_type="", router_token="", uaid="", chid=""):
self.uaid = uaid
router_data = params
if router_type not in self.ap_settings.routers or not router_data:
self.log.debug("Invalid router requested", status_code=400,
self.log.debug(format="Invalid router requested", status_code=400,
errno=108, **self._client_info)
return self._write_response(
400, 108, message="Invalid router")
Expand Down Expand Up @@ -737,7 +747,7 @@ def delete(self, router_type="", router_token="", uaid="", chid=""):
return self._write_unauthorized_response(
message="Invalid Authentication")
if router_type not in self.ap_settings.routers:
self.log.debug("Invalid router requested",
self.log.debug(format="Invalid router requested",
status_code=400, errno=108,
**self._client_info)
return self._write_response(
Expand Down Expand Up @@ -810,7 +820,8 @@ def _return_endpoint(self, endpoint_data, new_uaid, router=None):
else:
msg = dict(channelID=self.chid, endpoint=endpoint_data[0])
self.write(json.dumps(msg))
self.log.debug("Endpoint registered via HTTP", **self._client_info)
self.log.debug(format="Endpoint registered via HTTP",
**self._client_info)
self.finish()

def _success(self, result):
Expand Down
3 changes: 2 additions & 1 deletion autopush/health.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ def _check_success(self, result, name):
def _check_error(self, failure, name):
"""Returns an error, and why"""
self._healthy = False
self.log.failure(failure, name)
format = failure.value.message or "Heath Exception"
self.log.failure(format=format, failure=failure, name=name)

cause = self._health_checks[name] = {"status": "NOT OK"}
if failure.check(InternalServerError):
Expand Down
7 changes: 5 additions & 2 deletions autopush/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def __call__(self, event):
f = event["failure"]
reactor.callFromThread(
self.raven_client.captureException,
(f.type, f.value, f.getTracebackObject())
exc_info=(f.type, f.value, f.getTracebackObject())
)

text = self.format_event(event)
Expand Down Expand Up @@ -124,8 +124,11 @@ def json_format(self, event):
"Logger": self.logger_name,
}
# Add the nicely formatted message
msg["Fields"]["message"] = formatEvent(event)

try:
msg["Fields"]["message"] = formatEvent(event)
except Exception as exp:
msg["Fields"]["message"] = {"format_error": repr(exp)}
return json.dumps(msg, skipkeys=True) + "\n"

def start(self):
Expand Down
4 changes: 2 additions & 2 deletions autopush/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ def make_settings(args, **kwargs):
# unaccounted.
senderID = senderIDs.choose_ID()
if senderID is None:
log.critical("No GCM SenderIDs specified or found.")
log.critical(format="No GCM SenderIDs specified or found.")
return
router_conf["gcm"] = {"ttl": args.gcm_ttl,
"dryrun": args.gcm_dryrun,
Expand Down Expand Up @@ -468,7 +468,7 @@ def endpoint_main(sysargs=None, use_files=True):
try:
senderid_list = json.loads(args.senderid_list)
except (ValueError, TypeError):
log.critical("Invalid JSON specified for senderid_list")
log.critical(format="Invalid JSON specified for senderid_list")
return

log_level = args.log_level or ("debug" if args.debug else "info")
Expand Down
17 changes: 11 additions & 6 deletions autopush/senderids.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,16 @@ def __init__(self, args):
self.service = LoopingCall(self._refresh)
if senderIDs:
if type(senderIDs) is not dict:
self.log.critical("senderid_list is not a dict. Ignoring")
self.log.critical(
format="senderid_list is not a dict. Ignoring")
else:
# We're initializing, so it's ok to block.
self.update(senderIDs)

def start(self):
if self._use_s3:
self.log.debug("Starting SenderID service...")
self.log.debug(
format="Starting SenderID service...")
self.service.start(self._expry)

def _write(self, senderIDs, *args):
Expand Down Expand Up @@ -94,8 +96,9 @@ def _update_senderIDs(self, *args):
candidates = json.loads(key.get_contents_as_string())
if candidates:
if type(candidates) is not dict:
self.log.critical("Wrong data type stored for senderIDs. "
"Should be dict. Ignoring.")
self.log.critical(
format=("Wrong data type stored for senderIDs. "
"Should be dict. Ignoring."))
return
return candidates

Expand All @@ -117,7 +120,8 @@ def update(self, senderIDs):
if not senderIDs:
return
if type(senderIDs) is not dict:
self.log.critical("Wrong data type for senderIDs. Should be dict.")
self.log.critical(
format="Wrong data type for senderIDs. Should be dict.")
return
if not self._use_s3:
# Skip using s3 (For debugging)
Expand Down Expand Up @@ -149,5 +153,6 @@ def choose_ID(self):

def stop(self):
if self.service and self.service.running:
self.log.debug("Stopping SenderID service...")
self.log.debug(
format="Stopping SenderID service...")
self.service.stop()
5 changes: 3 additions & 2 deletions autopush/tests/test_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -1065,7 +1065,7 @@ def handle_finish(result):
self.endpoint.set_header.assert_called_with(
"Location", "Somewhere")
args, kwargs = self.endpoint.log.info.call_args
eq_("Successful delivery", args[0])
eq_("Successful delivery", kwargs.get('format') or args[0])
self.finish_deferred.addCallback(handle_finish)

self.endpoint.post(None, dummy_uaid)
Expand All @@ -1092,7 +1092,8 @@ def handle_finish(result):
self.endpoint.set_header.assert_called_with(
"Location", "Somewhere")
args, kwargs = self.endpoint.log.info.call_args
eq_("Router miss, message stored.", args[0])
eq_("Router miss, message stored.",
kwargs.get('format') or args[0])
self.finish_deferred.addCallback(handle_finish)

self.endpoint.post(None, dummy_uaid)
Expand Down
14 changes: 14 additions & 0 deletions autopush/tests/test_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,20 @@ def test_firehose_only_output(self, mock_boto3):
eq_(len(obj.firehose.mock_calls), 3)
eq_(len(obj.firehose.process.mock_calls), 1)

@patch("autopush.logging.formatEvent")
def test_bad_format(self, mock_format):

def raise_exc(*args, **kwargs):
raise Exception('Oops')

obj = PushLogger.setup_logging("Autopush")
obj._output = mock_stdout = Mock()
mock_format.side_effect = raise_exc
log.info("omg!", Type=7)
eq_(len(mock_stdout.mock_calls), 2)
kwargs = mock_stdout.mock_calls[0][1][0]
ok_("format_error" in kwargs)


class FirehoseProcessorTestCase(twisted.trial.unittest.TestCase):
def setUp(self):
Expand Down
6 changes: 3 additions & 3 deletions autopush/tests/test_websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -1326,7 +1326,7 @@ def check_hello_result(msg):
eq_(len(self.proto.ps.direct_updates), 0)
eq_(len(self.proto.log.info.mock_calls), 1)
args, kwargs = self.proto.log.info.call_args
eq_(args[0], "Ack")
eq_(kwargs.get('format') or args[0], "Ack")
eq_(kwargs["router_key"], "simplepush")
eq_(kwargs["message_source"], "direct")

Expand Down Expand Up @@ -1358,7 +1358,7 @@ def test_ack_with_webpush_direct(self):
eq_(self.proto.ps.direct_updates[chid], [])
eq_(len(self.proto.log.info.mock_calls), 1)
args, kwargs = self.proto.log.info.call_args
eq_(args[0], "Ack")
eq_(kwargs.get('format') or args[0], "Ack")
eq_(kwargs["router_key"], "webpush")
eq_(kwargs["message_source"], "direct")

Expand All @@ -1383,7 +1383,7 @@ def test_ack_with_webpush_from_storage(self):
assert mock_defer.addBoth.called
eq_(len(self.proto.log.info.mock_calls), 1)
args, kwargs = self.proto.log.info.call_args
eq_(args[0], "Ack")
eq_(kwargs.get('format') or args[0], "Ack")
eq_(kwargs["router_key"], "webpush")
eq_(kwargs["message_source"], "stored")

Expand Down
7 changes: 5 additions & 2 deletions autopush/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,11 @@ def write_error(self, code, **kwargs):
"""
self.set_status(code)
if "exc_info" in kwargs:
self.log.failure(failure.Failure(*kwargs["exc_info"]),
**self._client_info)
format = kwargs.get("format", "Exception")
self.log.failure(
format=format,
failure=failure.Failure(*kwargs["exc_info"]),
**self._client_info)
else:
self.log.failure("Error in handler: %s" % code,
**self._client_info)
Expand Down
Loading

0 comments on commit 61eb51b

Please sign in to comment.