-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Graceful termination #49
Changes from 48 commits
6fe8f2a
5623ff8
33fcaa9
d50dae9
134a242
e8f7c1e
7f245d8
a74febd
ee7dfca
e3c134b
22f77e9
f488f80
9c96656
a57fcd4
dab2f2f
000ebae
dd5db42
7d15214
292e810
e1e0607
8d41825
f402cae
82747eb
ccc63c8
63aa5ed
7675ce4
3cfcf53
ae1db25
02c2bd1
9d14c81
1f91758
51fdb69
fc2941d
c2cebd6
2f4acc4
b5fffd4
abbef1d
cc726dd
8cb69b5
3fa6487
bc6a6f8
3e79db1
3591087
e3a56c3
68eccf4
35eec8d
b3986d7
9ae76cc
c59efe7
b6e6922
9411b3c
71187ad
50f8be4
0e536d7
aa8bdce
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,4 +3,6 @@ __pycache__ | |
.tox | ||
*_pb2.py | ||
*_pb2_grpc.py | ||
.coverage | ||
.coverage | ||
.DS_Store | ||
.idea |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
# -*- coding: utf-8 -*- | ||
from nameko_grpc.h2_patch import patch_h2_transitions | ||
|
||
|
||
patch_h2_transitions() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,6 @@ | ||
# -*- coding: utf-8 -*- | ||
import itertools | ||
import logging | ||
import select | ||
import sys | ||
from collections import deque | ||
|
@@ -8,7 +9,7 @@ | |
from threading import Event | ||
|
||
from grpc import StatusCode | ||
from h2.config import H2Configuration | ||
from h2.config import DummyLogger, H2Configuration | ||
from h2.connection import H2Connection | ||
from h2.errors import ErrorCodes | ||
from h2.events import ( | ||
|
@@ -37,9 +38,30 @@ | |
log = getLogger(__name__) | ||
|
||
|
||
class H2Logger(DummyLogger): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No interface available so subclassing the dummy logger as a stand-in |
||
""" | ||
Provide logger to H2 matching required interface | ||
""" | ||
|
||
def __init__(self, logger: logging.Logger): | ||
super().__init__() | ||
self.logger = logger | ||
|
||
def debug(self, *vargs, **kwargs): | ||
self.logger.debug(*vargs, **kwargs) | ||
|
||
def trace(self, *vargs, **kwargs): | ||
# log level below debug | ||
self.logger.log(5, *vargs, **kwargs) | ||
|
||
|
||
SELECT_TIMEOUT = 0.01 | ||
|
||
|
||
class ConnectionTerminatingError(Exception): | ||
pass | ||
|
||
|
||
class ConnectionManager: | ||
""" | ||
Base class for managing a single GRPC HTTP/2 connection. | ||
|
@@ -52,18 +74,20 @@ class ConnectionManager: | |
def __init__(self, sock, client_side): | ||
self.sock = sock | ||
|
||
config = H2Configuration(client_side=client_side) | ||
h2_logger = H2Logger(log.getChild("h2")) | ||
config = H2Configuration(client_side=client_side, logger=h2_logger) | ||
self.conn = H2Connection(config=config) | ||
|
||
self.receive_streams = {} | ||
self.send_streams = {} | ||
|
||
self.run = True | ||
self.stopped = Event() | ||
self.terminating = False | ||
|
||
@property | ||
def alive(self): | ||
return not self.stopped.is_set() | ||
return not self.stopped.is_set() and not self.terminating | ||
|
||
@contextmanager | ||
def cleanup_on_exit(self): | ||
|
@@ -83,23 +107,25 @@ def cleanup_on_exit(self): | |
if send_stream.closed: | ||
continue # stream.close() is idemponent but this prevents the log | ||
log.info( | ||
f"Terminating send stream {send_stream}" | ||
f"Terminating send stream {send_stream.stream_id}" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. stream ids are much more useful than the object address |
||
f"{f' with error {error}' if error else ''}." | ||
) | ||
send_stream.close(error) | ||
for receive_stream in self.receive_streams.values(): | ||
if receive_stream.closed: | ||
continue # stream.close() is idemponent but this prevents the log | ||
log.info( | ||
f"Terminating receive stream {receive_stream}" | ||
f"Terminating receive stream {receive_stream.stream_id}" | ||
f"{f' with error {error}' if error else ''}." | ||
) | ||
receive_stream.close(error) | ||
self.sock.close() | ||
self.stopped.set() | ||
log.debug(f"connection terminated {self}") | ||
|
||
def run_forever(self): | ||
"""Event loop.""" | ||
log.debug(f"connection initiated {self}") | ||
self.conn.initiate_connection() | ||
|
||
with self.cleanup_on_exit(): | ||
|
@@ -108,6 +134,9 @@ def run_forever(self): | |
|
||
self.on_iteration() | ||
|
||
if not self.run: | ||
break | ||
|
||
self.sock.sendall(self.conn.data_to_send()) | ||
ready = select.select([self.sock], [], [], SELECT_TIMEOUT) | ||
if not ready[0]: | ||
|
@@ -142,8 +171,10 @@ def run_forever(self): | |
self.connection_terminated(event) | ||
|
||
def stop(self): | ||
self.run = False | ||
self.stopped.wait() | ||
self.conn.close_connection() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sends a go away to the server/client There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice |
||
self.terminating = True | ||
log.debug("waiting for connection to terminate (Timeout 5s)") | ||
self.stopped.wait(5) | ||
|
||
def on_iteration(self): | ||
"""Called on every iteration of the event loop. | ||
|
@@ -155,6 +186,16 @@ def on_iteration(self): | |
self.send_headers(stream_id) | ||
self.send_data(stream_id) | ||
|
||
if self.terminating: | ||
send_streams_closed = all( | ||
stream.exhausted for stream in self.send_streams.values() | ||
) | ||
receive_streams_closed = all( | ||
stream.exhausted for stream in self.receive_streams.values() | ||
) | ||
if send_streams_closed and receive_streams_closed: | ||
self.run = False | ||
|
||
def request_received(self, event): | ||
"""Called when a request is received on a stream. | ||
|
||
|
@@ -199,6 +240,7 @@ def window_updated(self, event): | |
Any data waiting to be sent on the stream may fit in the window now. | ||
""" | ||
log.debug("window updated, stream %s", event.stream_id) | ||
self.send_headers(event.stream_id) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Need to send headers before data if they exist. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Was this a pre-existing bug that you've happened to identify as part of this work? If so, how did you identity it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. arguably an existing bug that came up through existing tests after changing the stop method to shutdown with a goway frame first. |
||
self.send_data(event.stream_id) | ||
|
||
def stream_ended(self, event): | ||
|
@@ -235,9 +277,22 @@ def trailers_received(self, event): | |
|
||
receive_stream.trailers.set(*event.headers, from_wire=True) | ||
|
||
def connection_terminated(self, event): | ||
log.debug("connection terminated") | ||
self.run = False | ||
def connection_terminated(self, event: ConnectionTerminated): | ||
"""H2 signals a connection terminated event after receiving a GOAWAY frame | ||
|
||
If no error has occurred, flag termination and initiate a graceful termination | ||
allowing existing streams to finish sending/receiving. | ||
|
||
If an error has occurred then close down immediately. | ||
""" | ||
log.debug(f"received GOAWAY with error code {event.error_code}") | ||
if event.error_code not in (ErrorCodes.NO_ERROR, ErrorCodes.ENHANCE_YOUR_CALM): | ||
log.debug("connection terminating immediately") | ||
self.terminating = True | ||
self.run = False | ||
else: | ||
log.debug("connection terminating") | ||
self.terminating = True | ||
|
||
def send_headers(self, stream_id, immediate=False): | ||
"""Attempt to send any headers on a stream. | ||
|
@@ -331,7 +386,19 @@ def send_request(self, request_headers): | |
over the response. | ||
|
||
Invocations are queued and sent on the next iteration of the event loop. | ||
|
||
raises ConnectionTerminatingError if connection is terminating. Check | ||
stephenc-pace marked this conversation as resolved.
Show resolved
Hide resolved
|
||
connection .is_alive() before initiating send_request | ||
|
||
Note: | ||
We are handling termination and raising TerminatingError here as the | ||
underlying library H2 doesn't do this. If H2 ever begins handling graceful | ||
shutdowns, this logic will need altering. | ||
""" | ||
if self.terminating: | ||
raise ConnectionTerminatingError( | ||
"Connection is terminating. No new streams can be initiated" | ||
) | ||
stream_id = next(self.counter) | ||
|
||
request_stream = SendStream(stream_id) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
# -*- coding: utf-8 -*- | ||
import logging | ||
|
||
from h2.connection import ( | ||
ConnectionInputs, | ||
ConnectionState, | ||
H2Connection, | ||
H2ConnectionStateMachine, | ||
) | ||
|
||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
def patch_h2_transitions() -> None: | ||
""" | ||
H2 transitions immediately to closed state upon receiving a GOAWAY frame. | ||
This is out of spec and results in errors when frames are received between | ||
the GOAWAY and us (the client) terminating the connection gracefully. | ||
We still terminate the connection following a GOAWAY. | ||
https://github.com/python-hyper/h2/issues/1181 | ||
Instead of transitioning to CLOSED, remain in the current STATE and await | ||
a graceful termination. | ||
We also need to patch (noop) clear_outbound_data_buffer which would clear any | ||
outbound data. | ||
Fixes: | ||
RPC terminated with: | ||
code = StatusCode.UNAVAILABLE | ||
message = "Invalid input ConnectionInputs.RECV_PING in state ConnectionState.CLOSED" | ||
status = "code: 14 | ||
""" | ||
logger.info("H2 transitions patched for RECV_GOAWAY frame fix") | ||
|
||
patched_transitions = { | ||
# State: idle | ||
(ConnectionState.IDLE, ConnectionInputs.RECV_GOAWAY): ( | ||
None, | ||
ConnectionState.IDLE, | ||
), | ||
# State: open, client side. | ||
(ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_GOAWAY): ( | ||
None, | ||
ConnectionState.CLIENT_OPEN, | ||
), | ||
# State: open, server side. | ||
(ConnectionState.SERVER_OPEN, ConnectionInputs.RECV_GOAWAY): ( | ||
None, | ||
ConnectionState.SERVER_OPEN, | ||
), | ||
(ConnectionState.IDLE, ConnectionInputs.SEND_GOAWAY): ( | ||
None, | ||
ConnectionState.IDLE, | ||
), | ||
(ConnectionState.CLIENT_OPEN, ConnectionInputs.SEND_GOAWAY): ( | ||
None, | ||
ConnectionState.CLIENT_OPEN, | ||
), | ||
(ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_GOAWAY): ( | ||
None, | ||
ConnectionState.SERVER_OPEN, | ||
), | ||
} | ||
|
||
H2ConnectionStateMachine._transitions.update(patched_transitions) | ||
|
||
# no op this method which is called by h2 after recieving a GO_AWAY frame | ||
def clear_outbound_data_buffer(*args, **kwargs): | ||
pass | ||
|
||
H2Connection.clear_outbound_data_buffer = clear_outbound_data_buffer |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -57,8 +57,11 @@ def __init__(self, stream_id): | |
def exhausted(self): | ||
"""A stream is exhausted if it is closed and there are no more messages to be | ||
consumed or bytes to be read. | ||
|
||
When a stream is closed we append the STREAM_END item or a GrpcError, so an exhausted | ||
stream will possibly still have 1 item left in the queue, so we must check for that. | ||
""" | ||
return self.closed and self.queue.empty() and self.buffer.empty() | ||
return self.closed and self.queue.qsize() in (0, 1) and self.buffer.empty() | ||
stephenc-pace marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
def close(self, error=None): | ||
"""Close this stream, preventing further messages or data to be added. | ||
|
@@ -70,7 +73,7 @@ def close(self, error=None): | |
in race conditions between timeout threads, connection teardown, and the | ||
natural termination of streams. | ||
|
||
SendStreams have an additional race condition beteen the end of the iterator | ||
SendStreams have an additional race condition between the end of the iterator | ||
and the StreamEnded event received from the remote side. | ||
|
||
An error is only raised if the first invocation happened due to an error. | ||
|
@@ -166,7 +169,7 @@ def headers_to_send(self, defer_until_data=True): | |
if self.headers_sent or len(self.headers) == 0: | ||
return False | ||
|
||
if defer_until_data and self.queue.empty(): | ||
if defer_until_data and self.queue.empty() and self.buffer.empty(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if flush_to_buffer is called then the queue may be empty while data sits in the buffer. |
||
return False | ||
|
||
self.headers_sent = True | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,7 +7,6 @@ | |
|
||
import pytest | ||
from grpc import StatusCode | ||
from h2.events import ConnectionTerminated | ||
from nameko import config | ||
|
||
from nameko_grpc.constants import Cardinality | ||
|
@@ -360,19 +359,3 @@ def test_invalid_request(self, client, protobufs): | |
client.unary_unary(protobufs.ExampleRequest(value="hello")) | ||
assert error.value.code == StatusCode.INTERNAL | ||
assert error.value.message == "Exception deserializing request!" | ||
|
||
|
||
class TestErrorStreamClosed: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No longer required as not exceptional any more. |
||
@pytest.fixture(params=["client=nameko"]) | ||
def client_type(self, request): | ||
return request.param[7:] | ||
|
||
def test_response_stream_closed(self, client, protobufs): | ||
with mock.patch( | ||
"h2.connection.H2Connection.receive_data", | ||
return_value=[ConnectionTerminated()], | ||
): | ||
with pytest.raises(GrpcError) as error: | ||
client.unary_unary(protobufs.ExampleRequest(value="hello")) | ||
assert error.value.code == StatusCode.UNAVAILABLE | ||
assert error.value.message == "Stream was closed mid-request" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These were missing from the init. There's a few more cases of this but not fixing everything.