Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry on EADDRINUSE. #102

Merged
merged 2 commits into from
Sep 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions hub/elastic_sync/service.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import errno
import os
import json
import typing
Expand Down Expand Up @@ -55,9 +56,24 @@ def open_db(self):
)

async def run_es_notifier(self, synchronized: asyncio.Event):
server = await asyncio.get_event_loop().create_server(
lambda: ElasticNotifierProtocol(self._listeners), self.env.elastic_notifier_host, self.env.elastic_notifier_port
)
started = False
while not started:
try:
server = await asyncio.get_event_loop().create_server(
lambda: ElasticNotifierProtocol(self._listeners),
self.env.elastic_notifier_host,
self.env.elastic_notifier_port
)
started = True
except Exception as e:
if not isinstance(e, asyncio.CancelledError):
self.log.error(f'ES notifier server failed to listen on '
f'{self.env.elastic_notifier_host}:'
f'{self.env.elastic_notifier_port:d} : {e!r}')
if isinstance(e, OSError) and e.errno is errno.EADDRINUSE:
await asyncio.sleep(3)
continue
raise
self.log.info("ES notifier server listening on TCP %s:%i", self.env.elastic_notifier_host,
self.env.elastic_notifier_port)
synchronized.set()
Expand Down
24 changes: 19 additions & 5 deletions hub/herald/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import sys
import math
import time
import errno
import codecs
import typing
import asyncio
Expand Down Expand Up @@ -270,9 +271,11 @@ async def _start_server(self, kind, *args, **kw_args):
host, port = args[:2]
try:
self.servers[kind] = await loop.create_server(protocol_factory, *args, **kw_args)
except OSError as e: # don't suppress CancelledError
self.logger.error(f'{kind} server failed to listen on {host}:'
f'{port:d} :{e!r}')
except Exception as e:
if not isinstance(e, asyncio.CancelledError):
self.logger.error(f'{kind} server failed to listen on '
f'{host}:{port:d} : {e!r}')
raise
else:
self.logger.info(f'{kind} server listening on {host}:{port:d}')

Expand All @@ -282,8 +285,19 @@ async def _start_external_servers(self):
"""
env = self.env
host = env.cs_host()
if env.tcp_port is not None:
await self._start_server('TCP', host, env.tcp_port)
if env.tcp_port is None:
return
started = False
while not started:
try:
await self._start_server('TCP', host, env.tcp_port)
started = True
except OSError as e:
if e.errno is errno.EADDRINUSE:
await asyncio.sleep(3)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this log as a warning? Otherwise LGTM

Copy link
Member

@jackrobison jackrobison Sep 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there other socket errors we might get? Is this needed elsewhere too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_start_server() logs at error level already.

run_es_notifier() looks like it would be vulnerable to the same thing. I have not seen it in the test failures, but that could be because tests don't stop/start elastic sync a lot.

I haven't been able to determine whether there are other transient error codes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed run_es_notifier() in similar way. Added broad logging of Exception types.

continue
raise


async def _close_servers(self, kinds):
"""Close the servers of the given kinds (TCP etc.)."""
Expand Down