Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
fix: move metric increments to lowest callbacks
Browse files Browse the repository at this point in the history
The metric increments were being called for registration API
calls due to an error callback. They weren't called for success
cases as well. Moving them to the lower callbacks with a new
flag should help ensure they're incremented correctly.

Also fixed message_data calls to use increment instead of a gauge
and removed sending of base_tags in websocket.py to avoid too many
metric values.

Closes #958
  • Loading branch information
bbangert committed Jul 19, 2017
1 parent 261b95e commit 6796949
Show file tree
Hide file tree
Showing 11 changed files with 83 additions and 76 deletions.
15 changes: 8 additions & 7 deletions autopush/router/apnsrouter.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,14 +173,15 @@ def _route(self, notification, router_data):
application=rel_channel))

self.metrics.increment(
"updates.client.bridge.apns.%s.sent" %
router_data["rel_channel"],
self._base_tags
"updates.client.bridge.apns.{}.sent".format(
router_data["rel_channel"]
),
tags=self._base_tags
)
self.metrics.gauge("notification.message_data",
notification.data_length,
tags=make_tags(self._base_tags,
destination='Direct'))
self.metrics.increment("notification.message_data",
notification.data_length,
tags=make_tags(self._base_tags,
destination='Direct'))
return RouterResponse(status_code=201, response_body="",
headers={"TTL": notification.ttl,
"Location": location},
Expand Down
10 changes: 5 additions & 5 deletions autopush/router/fcm.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,11 +267,11 @@ def _process_reply(self, reply, notification, router_data, ttl):
router_data={},
)
self.metrics.increment("notification.bridge.sent",
self._base_tags)
self.metrics.gauge("notification.message_data",
notification.data_length,
tags=make_tags(self._base_tags,
destination="Direct"))
tags=self._base_tags)
self.metrics.increment("notification.message_data",
notification.data_length,
tags=make_tags(self._base_tags,
destination="Direct"))
location = "%s/m/%s" % (self.ap_settings.endpoint_url,
notification.version)
return RouterResponse(status_code=201, response_body="",
Expand Down
10 changes: 5 additions & 5 deletions autopush/router/gcm.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,11 @@ def _process_reply(self, reply, uaid_data, ttl, notification):
log_exception=False)

self.metrics.increment("notification.bridge.sent",
self._base_tags)
self.metrics.gauge("notification.message_data",
notification.data_length,
tags=make_tags(self._base_tags,
destination='Direct'))
tags=self._base_tags)
self.metrics.increment("notification.message_data",
notification.data_length,
tags=make_tags(self._base_tags,
destination='Direct'))
location = "%s/m/%s" % (self.ap_settings.endpoint_url,
notification.version)
return RouterResponse(status_code=201, response_body="",
Expand Down
12 changes: 6 additions & 6 deletions autopush/router/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,15 @@ def amend_endpoint_response(self, response, router_data):
"""Stubbed out for this router"""

def stored_response(self, notification):
self.metrics.gauge("notification.message_data",
notification.data_length,
tags=make_tags(destination='Stored'))
self.metrics.increment("notification.message_data",
notification.data_length,
tags=make_tags(destination='Stored'))
return RouterResponse(202, "Notification Stored")

def delivered_response(self, notification):
self.metrics.gauge("notification.message_data",
notification.data_length,
tags=make_tags(destination='Direct'))
self.metrics.increment("notification.message_data",
notification.data_length,
tags=make_tags(destination='Direct'))
return RouterResponse(200, "Delivered")

@inlineCallbacks
Expand Down
12 changes: 6 additions & 6 deletions autopush/router/webpush.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ class WebPushRouter(SimpleRouter):
"""SimpleRouter subclass to store individual messages appropriately"""

def delivered_response(self, notification):
self.metrics.gauge("notification.message_data",
notification.data_length,
tags=make_tags(destination='Stored'))
self.metrics.increment("notification.message_data",
notification.data_length,
tags=make_tags(destination='Stored'))
location = "%s/m/%s" % (self.ap_settings.endpoint_url,
notification.location)
return RouterResponse(status_code=201, response_body="",
Expand All @@ -36,9 +36,9 @@ def delivered_response(self, notification):
logged_status=200)

def stored_response(self, notification):
self.metrics.gauge("notification.message_data",
notification.data_length,
tags=make_tags(destination='Direct'))
self.metrics.increment("notification.message_data",
notification.data_length,
tags=make_tags(destination='Direct'))
location = "%s/m/%s" % (self.ap_settings.endpoint_url,
notification.location)
return RouterResponse(status_code=201, response_body="",
Expand Down
9 changes: 5 additions & 4 deletions autopush/tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -722,8 +722,8 @@ def test_webpush_data_delivery_to_disconnected_client(self, m_ddog):

ok_(self.logs.logged_ci(lambda ci: 'message_size' in ci),
"message_size not logged")
eq_(self.conn.db.metrics._client.gauge.call_args[1]['tags'],
['source:Stored'])
inc_call = self.conn.db.metrics._client.increment.call_args_list[5]
eq_(inc_call[1]['tags'], ['source:Stored'])
yield self.shut_down(client)

@inlineCallbacks
Expand Down Expand Up @@ -1266,8 +1266,9 @@ def test_message_with_topic(self):
client = yield self.quick_register(use_webpush=True)
yield client.send_notification(data=data, topic="topicname")
self.conn.db.metrics.increment.assert_has_calls([
call('updates.notification.topic',
tags=['host:localhost', 'use_webpush:True'])
call('ua.command.hello'),
call('ua.command.register'),
call('ua.notification.topic')
])
yield self.shut_down(client)

Expand Down
2 changes: 1 addition & 1 deletion autopush/tests/test_websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -1755,7 +1755,7 @@ def test_notif_finished_with_too_many_messages(self):
d = Deferred()

def check(*args, **kwargs):
eq_(self.metrics.gauge.call_args[1]['tags'], ["source:Direct"])
eq_(self.metrics.increment.call_args[1]['tags'], ["source:Direct"])
ok_(self.proto.force_retry.called)
ok_(self.send_mock.called)
d.callback(True)
Expand Down
48 changes: 29 additions & 19 deletions autopush/web/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ def initialize(self):
self._base_tags = {}
self._start_time = time.time()
self._timings = {}
self._handling_message = False

@property
def routers(self):
Expand Down Expand Up @@ -186,7 +187,9 @@ def head(self, *args, **kwargs):
#############################################################
def _write_response(self, status_code, errno, message=None, error=None,
headers=None,
url=DEFAULT_ERR_URL):
url=DEFAULT_ERR_URL,
router_type=None,
vapid=None):
"""Writes out a full JSON error and sets the appropriate status"""
self.set_status(status_code, reason=error)
error_data = dict(
Expand All @@ -207,6 +210,13 @@ def _write_response(self, status_code, errno, message=None, error=None,
if status_code == 410:
self.set_header("Cache-Control", "max-age=86400")

if self._handling_message and status_code >= 300:
self.metrics.increment('notification.message.error',
tags=[
"code:{}".format(status_code),
"router:{}".format(router_type),
"vapid:{}".format(vapid is not None)
])
self._track_timing()
self.finish()

Expand Down Expand Up @@ -255,27 +265,41 @@ def _boto_err(self, fail):
self._write_response(503, errno=202,
message="Communication error, please retry")

def _router_response(self, response):
def _router_response(self, response, router_type=None, vapid=False):
for name, val in response.headers.items():
if val is not None:
self.set_header(name, val)

if 200 <= response.status_code < 300:
self.set_status(response.status_code, reason=None)
self.write(response.response_body)

dest = 'Direct'
if response.status_code == 202 or response.logged_status == 202:
dest = 'Stored'

if self._handling_message:
self.metrics.increment('notification.message.success',
tags=[
'destination:{}'.format(dest),
'router:{}'.format(router_type),
'vapid:{}'.format(vapid is not None)
])
self._track_timing(status_code=response.logged_status)
self.finish()
else:
self._write_response(
response.status_code,
errno=response.errno or 999,
message=response.response_body)
message=response.response_body,
router_type=router_type,
vapid=vapid
)

def _router_fail_err(self, fail, router_type=None, vapid=False):
"""errBack for router failures"""
fail.trap(RouterException)
exc = fail.value
success = False
if exc.log_exception:
if exc.status_code >= 500:
fmt = fail.value.message or 'Exception'
Expand All @@ -288,27 +312,13 @@ def _router_fail_err(self, fail, router_type=None, vapid=False):
self.log.debug(format="Success", status_code=exc.status_code,
logged_status=exc.logged_status or 0,
client_info=self._client_info)
success = True
self.metrics.increment('notification.message.success',
tags=[
'destination:Direct',
'router:{}'.format(router_type),
'vapid:{}'.format(vapid is not None)
])
elif 400 <= exc.status_code < 500:
self.log.debug(format="Client error",
status_code=exc.status_code,
logged_status=exc.logged_status or 0,
errno=exc.errno or 0,
client_info=self._client_info)
if not success:
self.metrics.increment('notification.message.error',
tags=[
"code:{}".format(exc.status_code),
"router:{}".format(router_type),
"vapid:{}".format(vapid is not None)
])
self._router_response(exc)
self._router_response(exc, router_type, vapid)

def _write_validation_err(self, errors):
"""Writes a set of validation errors out with details about what
Expand Down
5 changes: 5 additions & 0 deletions autopush/web/simplepush.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ def extract_fields(self, d):
class SimplePushHandler(BaseWebHandler):
cors_methods = "PUT"

def initialize(self):
"""Must run on initialization to set ahead of validation"""
super(SimplePushHandler, self).initialize()
self._handling_message = True

@threaded_validate(SimplePushRequestSchema)
def put(self, subscription, version, data):
# type: (Dict[str, Any], str, str) -> Deferred
Expand Down
14 changes: 5 additions & 9 deletions autopush/web/webpush.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,11 @@ class WebPushHandler(BaseWebHandler):
"authorization")
cors_response_headers = ("location", "www-authenticate")

def initialize(self):
"""Must run on initialization to set ahead of validation"""
super(WebPushHandler, self).initialize()
self._handling_message = True

@threaded_validate(WebPushRequestSchema)
def post(self,
subscription, # type: Dict[str, Any]
Expand Down Expand Up @@ -520,24 +525,15 @@ def _router_completed(self, response, uaid_data, warning="",
return d
else:
# No changes are requested by the bridge system, proceed as normal
dest = 'Direct'
if response.status_code == 200 or response.logged_status == 200:
self.log.debug(format="Successful delivery",
client_info=self._client_info)
elif response.status_code == 202 or response.logged_status == 202:
self.log.debug(
format="Router miss, message stored.",
client_info=self._client_info)
dest = 'Stored'
self.metrics.timing("notification.request_time",
duration=time_diff)
self.metrics.increment('notification.message.success',
tags=make_tags(
destination=dest,
router=router_type,
vapid=(vapid is not None))
)

response.response_body = (
response.response_body + " " + warning).strip()
self._router_response(response)
22 changes: 8 additions & 14 deletions autopush/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -819,7 +819,6 @@ def _verify_user_record(self):
uaid_hash=self.ps.uaid_hash,
uaid_record=dump_uaid(record))
tags = ['code:104']
tags.extend(self.base_tags or [])
self.metrics.increment("ua.expiration", tags=tags)
self.force_retry(self.db.router.drop_user, self.ps.uaid)
return None
Expand All @@ -835,7 +834,6 @@ def _verify_user_record(self):
self.force_retry(self.db.router.drop_user,
self.ps.uaid)
tags = ['code:105']
tags.extend(self.base_tags or [])
self.metrics.increment("ua.expiration", tags=tags)
return None

Expand Down Expand Up @@ -925,7 +923,7 @@ def finish_hello(self, previous):
self.sendJSON(msg)
self.log.debug(format="hello", uaid_hash=self.ps.uaid_hash,
**self.ps.raw_agent)
self.metrics.increment("ua.command.hello", tags=self.base_tags)
self.metrics.increment("ua.command.hello")
self.process_notifications()

def process_notifications(self):
Expand Down Expand Up @@ -1103,10 +1101,9 @@ def finish_webpush_notifications(self, result):
if self.sent_notification_count > self.ap_settings.msg_limit:
raise MessageOverloadException()
if notif.topic:
self.metrics.increment("notification.topic",
tags=self.base_tags)
self.metrics.gauge('ua.message_data', len(msg.get('data', '')),
tags=make_tags(source=notif.source))
self.metrics.increment("ua.notification.topic")
self.metrics.increment('ua.message_data', len(msg.get('data', '')),
tags=make_tags(source=notif.source))
self.sendJSON(msg)

# Did we send any messages?
Expand Down Expand Up @@ -1254,7 +1251,7 @@ def send_register_finish(self, result, endpoint, chid):
"status": 200
}
self.sendJSON(msg)
self.metrics.increment("ua.command.register", tags=self.base_tags)
self.metrics.increment("ua.command.register")
self.ps.stats.registers += 1
self.log.info(format="Register", channel_id=chid,
endpoint=endpoint,
Expand All @@ -1272,8 +1269,7 @@ def process_unregister(self, data):
except ValueError:
return self.bad_message("unregister", "Invalid ChannelID")

self.metrics.increment("ua.command.unregister",
tags=self.base_tags)
self.metrics.increment("ua.command.unregister")
self.ps.stats.unregisters += 1
event = dict(format="Unregister", channel_id=chid,
uaid_hash=self.ps.uaid_hash,
Expand Down Expand Up @@ -1435,7 +1431,7 @@ def process_ack(self, data):
if not updates or not isinstance(updates, list):
return

self.metrics.increment("ua.command.ack", tags=self.base_tags)
self.metrics.increment("ua.command.ack")
defers = filter(None, map(self.ack_update, updates))

if defers:
Expand All @@ -1456,7 +1452,6 @@ def process_nack(self, data):
user_agent=self.ps.user_agent, message_id=str(version),
code=code, **self.ps.raw_agent)
tags = ["code:401"]
tags.extend(self.base_tags or [])
self.metrics.increment('ua.command.nack', tags=tags)
self.ps.stats.nacks += 1

Expand Down Expand Up @@ -1516,8 +1511,7 @@ def send_notification(self, update):
update)
self.ps.direct_updates[chid].append(notif)
if notif.topic:
self.metrics.increment("updates.notification.topic",
tags=self.base_tags)
self.metrics.increment("ua.notification.topic")
self.sendJSON(notif.websocket_format())
else:
self.ps.direct_updates[chid] = version
Expand Down

0 comments on commit 6796949

Please sign in to comment.