-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Graceful termination #49
Changes from all commits
6fe8f2a
5623ff8
33fcaa9
d50dae9
134a242
e8f7c1e
7f245d8
a74febd
ee7dfca
e3c134b
22f77e9
f488f80
9c96656
a57fcd4
dab2f2f
000ebae
dd5db42
7d15214
292e810
e1e0607
8d41825
f402cae
82747eb
ccc63c8
63aa5ed
7675ce4
3cfcf53
ae1db25
02c2bd1
9d14c81
1f91758
51fdb69
fc2941d
c2cebd6
2f4acc4
b5fffd4
abbef1d
cc726dd
8cb69b5
3fa6487
bc6a6f8
3e79db1
3591087
e3a56c3
68eccf4
35eec8d
b3986d7
9ae76cc
c59efe7
b6e6922
9411b3c
71187ad
50f8be4
0e536d7
aa8bdce
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,4 +3,6 @@ __pycache__ | |
.tox | ||
*_pb2.py | ||
*_pb2_grpc.py | ||
.coverage | ||
.coverage | ||
.DS_Store | ||
.idea |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
# -*- coding: utf-8 -*- | ||
from nameko_grpc.h2_patch import patch_h2_transitions | ||
|
||
|
||
patch_h2_transitions() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,6 @@ | ||
# -*- coding: utf-8 -*- | ||
import itertools | ||
import logging | ||
import select | ||
import sys | ||
from collections import deque | ||
|
@@ -8,7 +9,7 @@ | |
from threading import Event | ||
|
||
from grpc import StatusCode | ||
from h2.config import H2Configuration | ||
from h2.config import DummyLogger, H2Configuration | ||
from h2.connection import H2Connection | ||
from h2.errors import ErrorCodes | ||
from h2.events import ( | ||
|
@@ -37,9 +38,30 @@ | |
log = getLogger(__name__) | ||
|
||
|
||
class H2Logger(DummyLogger): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No interface available so subclassing the dummy logger as a stand-in |
||
""" | ||
Provide logger to H2 matching required interface | ||
""" | ||
|
||
def __init__(self, logger: logging.Logger): | ||
super().__init__() | ||
self.logger = logger | ||
|
||
def debug(self, *vargs, **kwargs): | ||
self.logger.debug(*vargs, **kwargs) | ||
|
||
def trace(self, *vargs, **kwargs): | ||
# log level below debug | ||
self.logger.log(5, *vargs, **kwargs) | ||
|
||
|
||
SELECT_TIMEOUT = 0.01 | ||
|
||
|
||
class ConnectionTerminatingError(Exception): | ||
pass | ||
|
||
|
||
class ConnectionManager: | ||
""" | ||
Base class for managing a single GRPC HTTP/2 connection. | ||
|
@@ -49,21 +71,24 @@ class ConnectionManager: | |
by subclasses. | ||
""" | ||
|
||
def __init__(self, sock, client_side): | ||
def __init__(self, sock, client_side, max_concurrent_streams=100): | ||
self.sock = sock | ||
|
||
config = H2Configuration(client_side=client_side) | ||
h2_logger = H2Logger(log.getChild("h2")) | ||
config = H2Configuration(client_side=client_side, logger=h2_logger) | ||
self.conn = H2Connection(config=config) | ||
self.conn.local_settings.max_concurrent_streams = max_concurrent_streams | ||
|
||
self.receive_streams = {} | ||
self.send_streams = {} | ||
|
||
self.run = True | ||
self.stopped = Event() | ||
self.terminating = False | ||
|
||
@property | ||
def alive(self): | ||
return not self.stopped.is_set() | ||
return not self.stopped.is_set() and not self.terminating | ||
|
||
@contextmanager | ||
def cleanup_on_exit(self): | ||
|
@@ -83,23 +108,25 @@ def cleanup_on_exit(self): | |
if send_stream.closed: | ||
continue # stream.close() is idemponent but this prevents the log | ||
log.info( | ||
f"Terminating send stream {send_stream}" | ||
f"Terminating send stream {send_stream.stream_id}" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. stream ids are much more useful than the object address |
||
f"{f' with error {error}' if error else ''}." | ||
) | ||
send_stream.close(error) | ||
for receive_stream in self.receive_streams.values(): | ||
if receive_stream.closed: | ||
continue # stream.close() is idemponent but this prevents the log | ||
log.info( | ||
f"Terminating receive stream {receive_stream}" | ||
f"Terminating receive stream {receive_stream.stream_id}" | ||
f"{f' with error {error}' if error else ''}." | ||
) | ||
receive_stream.close(error) | ||
self.sock.close() | ||
self.stopped.set() | ||
log.debug(f"connection terminated {self}") | ||
|
||
def run_forever(self): | ||
"""Event loop.""" | ||
log.debug(f"connection initiated {self}") | ||
self.conn.initiate_connection() | ||
|
||
with self.cleanup_on_exit(): | ||
|
@@ -108,6 +135,9 @@ def run_forever(self): | |
|
||
self.on_iteration() | ||
|
||
if not self.run: | ||
break | ||
|
||
self.sock.sendall(self.conn.data_to_send()) | ||
ready = select.select([self.sock], [], [], SELECT_TIMEOUT) | ||
if not ready[0]: | ||
|
@@ -142,8 +172,10 @@ def run_forever(self): | |
self.connection_terminated(event) | ||
|
||
def stop(self): | ||
self.run = False | ||
self.stopped.wait() | ||
self.conn.close_connection() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sends a go away to the server/client There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice |
||
self.terminating = True | ||
log.debug("waiting for connection to terminate (Timeout 5s)") | ||
self.stopped.wait(5) | ||
|
||
def on_iteration(self): | ||
"""Called on every iteration of the event loop. | ||
|
@@ -155,6 +187,16 @@ def on_iteration(self): | |
self.send_headers(stream_id) | ||
self.send_data(stream_id) | ||
|
||
if self.terminating: | ||
send_streams_closed = all( | ||
stream.exhausted for stream in self.send_streams.values() | ||
) | ||
receive_streams_closed = all( | ||
stream.exhausted for stream in self.receive_streams.values() | ||
) | ||
if send_streams_closed and receive_streams_closed: | ||
self.run = False | ||
|
||
def request_received(self, event): | ||
"""Called when a request is received on a stream. | ||
|
||
|
@@ -199,6 +241,7 @@ def window_updated(self, event): | |
Any data waiting to be sent on the stream may fit in the window now. | ||
""" | ||
log.debug("window updated, stream %s", event.stream_id) | ||
self.send_headers(event.stream_id) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Need to send headers before data if they exist. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Was this a pre-existing bug that you've happened to identify as part of this work? If so, how did you identity it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. arguably an existing bug that came up through existing tests after changing the stop method to shutdown with a goway frame first. |
||
self.send_data(event.stream_id) | ||
|
||
def stream_ended(self, event): | ||
|
@@ -210,16 +253,22 @@ def stream_ended(self, event): | |
receive_stream = self.receive_streams.pop(event.stream_id, None) | ||
if receive_stream: | ||
receive_stream.close() | ||
# send_stream = self.send_streams.pop(event.stream_id, None) | ||
# if send_stream: | ||
# send_stream.close() | ||
|
||
def stream_reset(self, event): | ||
"""Called when an incoming stream is reset. | ||
|
||
Close any `ReceiveStream` that was opened for this stream. | ||
Close any `ReceiveStream` or `SendStream` that was opened for this stream. | ||
""" | ||
log.debug("stream reset, stream %s", event.stream_id) | ||
receive_stream = self.receive_streams.pop(event.stream_id, None) | ||
if receive_stream: | ||
receive_stream.close() | ||
send_stream = self.send_streams.pop(event.stream_id, None) | ||
if send_stream: | ||
send_stream.close() | ||
|
||
def settings_changed(self, event): | ||
log.debug("settings changed") | ||
|
@@ -235,9 +284,22 @@ def trailers_received(self, event): | |
|
||
receive_stream.trailers.set(*event.headers, from_wire=True) | ||
|
||
def connection_terminated(self, event): | ||
log.debug("connection terminated") | ||
self.run = False | ||
def connection_terminated(self, event: ConnectionTerminated): | ||
"""H2 signals a connection terminated event after receiving a GOAWAY frame | ||
|
||
If no error has occurred, flag termination and initiate a graceful termination | ||
allowing existing streams to finish sending/receiving. | ||
|
||
If an error has occurred then close down immediately. | ||
""" | ||
log.debug(f"received GOAWAY with error code {event.error_code}") | ||
if event.error_code not in (ErrorCodes.NO_ERROR, ErrorCodes.ENHANCE_YOUR_CALM): | ||
log.debug("connection terminating immediately") | ||
self.terminating = True | ||
self.run = False | ||
else: | ||
log.debug("connection terminating") | ||
self.terminating = True | ||
|
||
def send_headers(self, stream_id, immediate=False): | ||
"""Attempt to send any headers on a stream. | ||
|
@@ -269,6 +331,14 @@ def send_data(self, stream_id): | |
# has been completely sent | ||
return | ||
|
||
# When a stream is closed, a STREAM_END item or ERROR is placed in the queue. | ||
# If we never read from the stream again, these are not surfaced, and the | ||
# stream is never exhausted. | ||
# Because we shortcut sending data if headers haven't been set yet, we need | ||
# to manually flush the queue, surfacing the end/error, and ensuring the | ||
# queue exhausts (and we can terminate). | ||
send_stream.flush_queue_to_buffer() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice find. Can you add a regression test case for this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will have a look. It came up through existing tests so I'll work out which ones cover it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, not easily. I added it as the tests were timing out (hanging) without it but removing it now and they appear to be passing still. I think it still makes sense to have it but it's tricky to test specifically. |
||
|
||
if not send_stream.headers_sent: | ||
# don't attempt to send any data until the headers have been sent | ||
return | ||
|
@@ -331,7 +401,19 @@ def send_request(self, request_headers): | |
over the response. | ||
|
||
Invocations are queued and sent on the next iteration of the event loop. | ||
|
||
raises ConnectionTerminatingError if connection is terminating. Check | ||
stephenc-pace marked this conversation as resolved.
Show resolved
Hide resolved
|
||
connection .is_alive() before initiating send_request | ||
|
||
Note: | ||
We are handling termination and raising TerminatingError here as the | ||
underlying library H2 doesn't do this. If H2 ever begins handling graceful | ||
shutdowns, this logic will need altering. | ||
""" | ||
if self.terminating: | ||
raise ConnectionTerminatingError( | ||
"Connection is terminating. No new streams can be initiated" | ||
) | ||
stream_id = next(self.counter) | ||
|
||
request_stream = SendStream(stream_id) | ||
|
@@ -427,8 +509,10 @@ class ServerConnectionManager(ConnectionManager): | |
Extends the base `ConnectionManager` to handle incoming GRPC requests. | ||
""" | ||
|
||
def __init__(self, sock, handle_request): | ||
super().__init__(sock, client_side=False) | ||
def __init__(self, sock, handle_request, max_concurrent_streams=100): | ||
super().__init__( | ||
sock, client_side=False, max_concurrent_streams=max_concurrent_streams | ||
) | ||
self.handle_request = handle_request | ||
|
||
def request_received(self, event): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These were missing from the init. There's a few more cases of this but not fixing everything.