Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Server lifecycle #1497

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
7 changes: 2 additions & 5 deletions tests/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import asyncio
import os
from contextlib import asynccontextmanager, contextmanager
from pathlib import Path
Expand All @@ -8,14 +7,12 @@

@asynccontextmanager
async def run_server(config: Config, sockets=None):
server = Server(config=config)
cancel_handle = asyncio.ensure_future(server.serve(sockets=sockets))
await asyncio.sleep(0.1)
server = Server(config=config, sockets=sockets)
await server.start_serving()
try:
yield server
finally:
await server.shutdown()
cancel_handle.cancel()
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's worth pointing this bit out as it suffers from all of the same problems that our test fixtures have suffered from:

  • asyncio.sleep(0.1) instead of checking for startup properly.
  • Two handles required to be able to shutdown the service fully.
  • server.shutdown() not actually killing the server main loop.

I think it's a good demonstration of my motivation for proposing these changes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep pretty interesting, unfortunate I cant dedicate more time on this, the only real "interrogation" I have it the interaction it may have with --reload and --workers

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can tell, there shouldn't be any interaction. Both just call server.run in a subprocess and kill the process by sending the appropriate signal. There doesn't appear to be much difference from running without multiple workers or a watcher, which is as it should be.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

going to play with this but re-reading it I like it,
@florimondmanca what are your thoughts on this, you extensively played with this a while ago, your input should be very interesting



@contextmanager
Expand Down
88 changes: 81 additions & 7 deletions uvicorn/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ def __init__(self) -> None:


class Server:
def __init__(self, config: Config) -> None:
def __init__(
self, config: Config, *, sockets: Optional[List[socket.socket]] = None
) -> None:
self.config = config
self.server_state = ServerState()

Expand All @@ -55,11 +57,19 @@ def __init__(self, config: Config) -> None:
self.force_exit = False
self.last_notified = 0.0

self._sockets: Optional[List[socket.socket]] = sockets

self._main_task: Optional[asyncio.Task] = None

#: Created on demand and set once immediately after startup has
#: completed and the server has started listening for requests.
self._startup_event: Optional[asyncio.Event] = None

def run(self, sockets: Optional[List[socket.socket]] = None) -> None:
self.config.setup_event_loop()
return asyncio.run(self.serve(sockets=sockets))

async def serve(self, sockets: Optional[List[socket.socket]] = None) -> None:
async def _main(self) -> None:
process_id = os.getpid()

config = self.config
Expand All @@ -68,22 +78,62 @@ async def serve(self, sockets: Optional[List[socket.socket]] = None) -> None:

self.lifespan = config.lifespan_class(config)

self.install_signal_handlers()

message = "Started server process [%d]"
color_message = "Started server process [" + click.style("%d", fg="cyan") + "]"
logger.info(message, process_id, extra={"color_message": color_message})

await self.startup(sockets=sockets)
await self.startup(sockets=self._sockets)
if self.should_exit:
return
await self.main_loop()
await self.shutdown(sockets=sockets)
await self._shutdown(sockets=self._sockets)

message = "Finished server process [%d]"
color_message = "Finished server process [" + click.style("%d", fg="cyan") + "]"
logger.info(message, process_id, extra={"color_message": color_message})

async def start_serving(self) -> None:
"""
Starts the server running in a background task and blocks until startup
is either fully complete, or fails.

Idempotent. Can be called multiple times without creating multiple
instances.
"""
if self._startup_event is None:
# We defer creating the startup event until start serving is called
# because there is no guarantee that the constructor will be called
# with an active loop.
self._startup_event = asyncio.Event()

if self._main_task is None:
self._main_task = asyncio.create_task(self._main())

# If the main task exits before the startup event is set it means that
# startup has failed.
await asyncio.wait(
[asyncio.create_task(self._startup_event.wait()), self._main_task],
return_when=asyncio.FIRST_COMPLETED,
)

async def serve(self, sockets: Optional[List[socket.socket]] = None) -> None:
if self._main_task is not None:
raise RuntimeError("cannot call serve on running server")

if sockets is not None:
if self._sockets is not None:
raise RuntimeError("cannot override already provided sockets list")
self._sockets = sockets

self.install_signal_handlers()
try:
await self.start_serving()
await self.wait_closed()

except asyncio.CancelledError:
self.close()
await self.wait_closed()

async def startup(self, sockets: list = None) -> None:
await self.lifespan.startup()
if self.lifespan.should_exit:
Expand Down Expand Up @@ -171,6 +221,9 @@ def _share_socket(sock: socket.SocketType) -> socket.SocketType:

self.started = True

assert self._startup_event is not None
self._startup_event.set()

def _log_started_message(self, listeners: Sequence[socket.SocketType]) -> None:
config = self.config

Expand Down Expand Up @@ -249,7 +302,7 @@ async def on_tick(self, counter: int) -> bool:
return self.server_state.total_requests >= self.config.limit_max_requests
return False

async def shutdown(self, sockets: Optional[List[socket.socket]] = None) -> None:
async def _shutdown(self, sockets: Optional[List[socket.socket]] = None) -> None:
logger.info("Shutting down")

# Stop accepting new connections.
Expand Down Expand Up @@ -283,6 +336,27 @@ async def shutdown(self, sockets: Optional[List[socket.socket]] = None) -> None:
if not self.force_exit:
await self.lifespan.shutdown()

def close(self, *, force_exit: bool = False) -> None:
"""
Asks the server, asynchronously, to initiate shutdown.
It should be safe to call this from a request handler.
"""
self.should_exit = True
if force_exit and not self.force_exit:
self.force_exit = True

async def wait_closed(self) -> None:
"""
Blocks until the server is completely shutdown.
"""
if self._main_task is None:
raise RuntimeError("Server hasn't been started")
await self._main_task

async def shutdown(self) -> None:
self.close()
await self.wait_closed()

def install_signal_handlers(self) -> None:
if threading.current_thread() is not threading.main_thread():
# Signals can only be listened to from the main thread.
Expand Down