Skip to content

Commit

Permalink
chore(pubsub): clean up log messages
Browse files Browse the repository at this point in the history
Main goal: ensure that `hash(message)` is useful: eg. that log messages
can be relied on to be constant for cases where it is "the same error"
and different for cases where it is not. This allows for clients to do
things such as alert when the rate of a given error message is above a
given threshold.

While I was here, I cleaned up the other log strings in the file to make
'em all consistently typed/formatted/etc.
  • Loading branch information
TheKevJames committed Dec 14, 2022
1 parent 1d4edae commit 7d63398
Showing 1 changed file with 33 additions and 35 deletions.
68 changes: 33 additions & 35 deletions pubsub/gcloud/aio/pubsub/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
float]]
else:
MessageQueue = asyncio.Queue

ApplicationHandler = Callable[[SubscriberMessage], Awaitable[None]]
T = TypeVar('T')

Expand All @@ -52,8 +53,8 @@ async def refresh(self) -> None:
self.subscription)
self.ack_deadline = float(sub['ackDeadlineSeconds'])
except Exception as e:
log.warning(
'Failed to refresh ackDeadlineSeconds value', exc_info=e)
log.warning('failed to refresh ackDeadlineSeconds value',
exc_info=e)
self.last_refresh = time.perf_counter()

def cache_outdated(self) -> bool:
Expand Down Expand Up @@ -91,9 +92,8 @@ async def acker(subscription: str,
# acknowledge endpoint limit is 524288 bytes
# which is ~2744 ack_ids
if len(ack_ids) > 2500:
log.error(
'acker is falling behind, dropping %d unacked messages',
len(ack_ids) - 2500)
log.error('acker is falling behind, dropping unacked messages',
extra={'count': len(ack_ids) - 2500})
ack_ids = ack_ids[-2500:]
for _ in range(len(ack_ids) - 2500):
ack_queue.task_done()
Expand All @@ -105,17 +105,16 @@ async def acker(subscription: str,
ack_queue.task_done()
except aiohttp.client_exceptions.ClientResponseError as e:
if e.status == 400:
log.error(
'Ack error is unrecoverable, '
'one or more messages may be dropped', exc_info=e)
log.exception('unrecoverable ack error, one or more '
'messages may be dropped: %s', e)

async def maybe_ack(ack_id: str) -> None:
try:
await subscriber_client.acknowledge(
subscription,
ack_ids=[ack_id])
except Exception as ex:
log.warning('Ack failed for ack_id=%s', ack_id,
log.warning('ack failed', extra={'ack_id': ack_id},
exc_info=ex)
finally:
ack_queue.task_done()
Expand All @@ -124,8 +123,8 @@ async def maybe_ack(ack_id: str) -> None:
asyncio.ensure_future(maybe_ack(ack_id))
ack_ids = []

log.warning(
'Ack request failed, better luck next batch', exc_info=e)
log.warning('ack request failed, better luck next batch',
exc_info=e)
metrics_client.increment('pubsub.acker.batch.failed')
metrics.BATCH_STATUS.labels(component='acker',
outcome='failed').inc()
Expand All @@ -134,8 +133,8 @@ async def maybe_ack(ack_id: str) -> None:
except asyncio.CancelledError:
raise
except Exception as e:
log.warning(
'Ack request failed, better luck next batch', exc_info=e)
log.warning('ack request failed, better luck next batch',
exc_info=e)
metrics_client.increment('pubsub.acker.batch.failed')
metrics.BATCH_STATUS.labels(component='acker',
outcome='failed').inc()
Expand Down Expand Up @@ -165,9 +164,8 @@ async def nacker(subscription: str,
# modifyAckDeadline endpoint limit is 524288 bytes
# which is ~2744 ack_ids
if len(ack_ids) > 2500:
log.error(
'nacker is falling behind, dropping %d unacked messages',
len(ack_ids) - 2500)
log.error('nacker is falling behind, dropping unacked '
'messages', extra={'count': len(ack_ids) - 2500})
ack_ids = ack_ids[-2500:]
for _ in range(len(ack_ids) - 2500):
nack_queue.task_done()
Expand All @@ -180,9 +178,8 @@ async def nacker(subscription: str,
nack_queue.task_done()
except aiohttp.client_exceptions.ClientResponseError as e:
if e.status == 400:
log.error(
'Nack error is unrecoverable, '
'one or more messages may be dropped', exc_info=e)
log.exception('unrecoverable nack error, one or more '
'messages may be dropped: %s', e)

async def maybe_nack(ack_id: str) -> None:
try:
Expand All @@ -191,16 +188,16 @@ async def maybe_nack(ack_id: str) -> None:
ack_ids=[ack_id],
ack_deadline_seconds=0)
except Exception as ex:
log.warning('Nack failed for ack_id=%s', ack_id,
exc_info=ex)
log.warning('nack failed',
extra={'ack_id': ack_id}, exc_info=ex)
finally:
nack_queue.task_done()
for ack_id in ack_ids:
asyncio.ensure_future(maybe_nack(ack_id))
ack_ids = []

log.warning(
'Nack request failed, better luck next batch', exc_info=e)
log.warning('nack request failed, better luck next batch',
exc_info=e)
metrics_client.increment('pubsub.nacker.batch.failed')
metrics.BATCH_STATUS.labels(
component='nacker', outcome='failed').inc()
Expand All @@ -209,8 +206,8 @@ async def maybe_nack(ack_id: str) -> None:
except asyncio.CancelledError:
raise
except Exception as e:
log.warning(
'Nack request failed, better luck next batch', exc_info=e)
log.warning('nack request failed, better luck next batch',
exc_info=e)
metrics_client.increment('pubsub.nacker.batch.failed')
metrics.BATCH_STATUS.labels(
component='nacker', outcome='failed').inc()
Expand Down Expand Up @@ -247,13 +244,13 @@ async def _execute_callback(message: SubscriberMessage,
except asyncio.CancelledError:
if nack_queue:
await nack_queue.put(message.ack_id)
log.warning('Application callback was cancelled')
log.warning('application callback was cancelled')
metrics_client.increment('pubsub.consumer.cancelled')
metrics.CONSUME.labels(outcome='cancelled').inc()
except Exception:
except Exception as e:
if nack_queue:
await nack_queue.put(message.ack_id)
log.exception('Application callback raised an exception')
log.exception('application callback raised an exception: %s', e)
metrics_client.increment('pubsub.consumer.failed')
metrics.CONSUME.labels(outcome='failed').inc()

Expand Down Expand Up @@ -303,14 +300,14 @@ async def _consume_one(message: SubscriberMessage,
message, pulled_at = await message_queue.get()
await asyncio.shield(_consume_one(message, pulled_at))
except asyncio.CancelledError:
log.info('Consumer worker cancelled. Gracefully terminating...')
log.info('consumer worker cancelled, gracefully terminating...')
for _ in range(max_tasks):
await semaphore.acquire()

await ack_queue.join()
if nack_queue:
await nack_queue.join()
log.info('Consumer terminated gracefully.')
log.info('consumer terminated gracefully')
raise

async def producer(
Expand Down Expand Up @@ -349,7 +346,7 @@ async def producer(

await message_queue.join()
except asyncio.CancelledError:
log.info('Producer worker cancelled. Gracefully terminating...')
log.info('producer worker cancelled, gracefully terminating...')

if not pull_task.done():
# Leaving the connection hanging can result in redelivered
Expand All @@ -364,7 +361,7 @@ async def producer(
await message_queue.put((m, pulled_at))

await message_queue.join()
log.info('Producer terminated gracefully.')
log.info('producer terminated gracefully')
raise

async def subscribe(subscription: str, # pylint: disable=too-many-locals
Expand Down Expand Up @@ -438,9 +435,9 @@ async def subscribe(subscription: str, # pylint: disable=too-many-locals
return_when=asyncio.FIRST_COMPLETED)
for task in done:
task.result()
raise Exception('A subscriber worker shut down unexpectedly!')
raise Exception('a subscriber worker shut down unexpectedly')
except (asyncio.CancelledError, Exception) as e:
log.info('Subscriber exited', exc_info=e)
log.info('subscriber exited', exc_info=e)
for task in producer_tasks:
task.cancel()
await asyncio.wait(producer_tasks,
Expand All @@ -454,4 +451,5 @@ async def subscribe(subscription: str, # pylint: disable=too-many-locals
for task in acker_tasks:
task.cancel()
await asyncio.wait(acker_tasks, return_when=asyncio.ALL_COMPLETED)
raise asyncio.CancelledError('Subscriber shut down')

raise asyncio.CancelledError('subscriber shut down')

0 comments on commit 7d63398

Please sign in to comment.