diff --git a/sanic/server/protocols/websocket_protocol.py b/sanic/server/protocols/websocket_protocol.py index 106c677244..fdd931bdda 100644 --- a/sanic/server/protocols/websocket_protocol.py +++ b/sanic/server/protocols/websocket_protocol.py @@ -80,7 +80,7 @@ def close_if_idle(self): # Called by Sanic Server when shutting down # If we've upgraded to websocket, shut it down if self.websocket is not None: - if self.websocket.connection.state in (CLOSING, CLOSED): + if self.websocket.ws_proto.state in (CLOSING, CLOSED): return True elif self.websocket.loop is not None: self.websocket.loop.create_task(self.websocket.close(1001)) diff --git a/sanic/server/websockets/impl.py b/sanic/server/websockets/impl.py index ff9c55eea4..c15db1710b 100644 --- a/sanic/server/websockets/impl.py +++ b/sanic/server/websockets/impl.py @@ -37,7 +37,7 @@ class WebsocketImplProtocol: - connection: ServerProtocol + ws_proto: ServerProtocol io_proto: Optional[SanicProtocol] loop: Optional[asyncio.AbstractEventLoop] max_queue: int @@ -63,14 +63,14 @@ class WebsocketImplProtocol: def __init__( self, - connection, + ws_proto, max_queue=None, ping_interval: Optional[float] = 20, ping_timeout: Optional[float] = 20, close_timeout: float = 10, loop=None, ): - self.connection = connection + self.ws_proto = ws_proto self.io_proto = None self.loop = None self.max_queue = max_queue @@ -92,7 +92,7 @@ def __init__( @property def subprotocol(self): - return self.connection.subprotocol + return self.ws_proto.subprotocol def pause_frames(self): if not self.can_pause: @@ -306,15 +306,15 @@ def fail_connection(self, code: int = 1006, reason: str = "") -> bool: # Not draining the write buffer is acceptable in this context. # clear the send buffer - _ = self.connection.data_to_send() + _ = self.ws_proto.data_to_send() # If we're not already CLOSED or CLOSING, then send the close. - if self.connection.state is OPEN: + if self.ws_proto.state is OPEN: if code in (1000, 1001): - self.connection.send_close(code, reason) + self.ws_proto.send_close(code, reason) else: - self.connection.fail(code, reason) + self.ws_proto.fail(code, reason) try: - data_to_send = self.connection.data_to_send() + data_to_send = self.ws_proto.data_to_send() while ( len(data_to_send) and self.io_proto @@ -328,7 +328,7 @@ def fail_connection(self, code: int = 1006, reason: str = "") -> bool: ... if code == 1006: # Special case: 1006 consider the transport already closed - self.connection.state = CLOSED + self.ws_proto.state = CLOSED if self.data_finished_fut and not self.data_finished_fut.done(): # We have a graceful auto-closer. Use it to close the connection. self.data_finished_fut.cancel() @@ -349,10 +349,10 @@ def end_connection(self, code=1000, reason=""): # In Python Version 3.7: pause_reading is idempotent # i.e. it can be called when the transport is already paused or closed. self.io_proto.transport.pause_reading() - if self.connection.state == OPEN: - data_to_send = self.connection.data_to_send() - self.connection.send_close(code, reason) - data_to_send.extend(self.connection.data_to_send()) + if self.ws_proto.state == OPEN: + data_to_send = self.ws_proto.data_to_send() + self.ws_proto.send_close(code, reason) + data_to_send.extend(self.ws_proto.data_to_send()) try: while ( len(data_to_send) @@ -461,7 +461,7 @@ def abort_pings(self) -> None: Raise ConnectionClosed in pending keepalive pings. They'll never receive a pong once the connection is closed. """ - if self.connection.state is not CLOSED: + if self.ws_proto.state is not CLOSED: raise ServerError( "Webscoket about_pings should only be called " "after connection state is changed to CLOSED" @@ -490,9 +490,9 @@ async def close(self, code: int = 1000, reason: str = "") -> None: self.fail_connection(code, reason) return async with self.conn_mutex: - if self.connection.state is OPEN: - self.connection.send_close(code, reason) - data_to_send = self.connection.data_to_send() + if self.ws_proto.state is OPEN: + self.ws_proto.send_close(code, reason) + data_to_send = self.ws_proto.data_to_send() await self.send_data(data_to_send) async def recv(self, timeout: Optional[float] = None) -> Optional[Data]: @@ -522,7 +522,7 @@ async def recv(self, timeout: Optional[float] = None) -> Optional[Data]: "already waiting for the next message" ) await self.recv_lock.acquire() - if self.connection.state is CLOSED: + if self.ws_proto.state is CLOSED: self.recv_lock.release() raise WebsocketClosed( "Cannot receive from websocket interface after it is closed." @@ -573,7 +573,7 @@ async def recv_burst(self, max_recv=256) -> Sequence[Data]: "for the next message" ) await self.recv_lock.acquire() - if self.connection.state is CLOSED: + if self.ws_proto.state is CLOSED: self.recv_lock.release() raise WebsocketClosed( "Cannot receive from websocket interface after it is closed." @@ -632,7 +632,7 @@ async def recv_streaming(self) -> AsyncIterator[Data]: "is already waiting for the next message" ) await self.recv_lock.acquire() - if self.connection.state is CLOSED: + if self.ws_proto.state is CLOSED: self.recv_lock.release() raise WebsocketClosed( "Cannot receive from websocket interface after it is closed." @@ -673,7 +673,7 @@ async def send(self, message: Union[Data, Iterable[Data]]) -> None: """ async with self.conn_mutex: - if self.connection.state in (CLOSED, CLOSING): + if self.ws_proto.state in (CLOSED, CLOSING): raise WebsocketClosed( "Cannot write to websocket interface after it is closed." ) @@ -686,12 +686,12 @@ async def send(self, message: Union[Data, Iterable[Data]]) -> None: # strings and bytes-like objects are iterable. if isinstance(message, str): - self.connection.send_text(message.encode("utf-8")) - await self.send_data(self.connection.data_to_send()) + self.ws_proto.send_text(message.encode("utf-8")) + await self.send_data(self.ws_proto.data_to_send()) elif isinstance(message, (bytes, bytearray, memoryview)): - self.connection.send_binary(message) - await self.send_data(self.connection.data_to_send()) + self.ws_proto.send_binary(message) + await self.send_data(self.ws_proto.data_to_send()) elif isinstance(message, Mapping): # Catch a common mistake -- passing a dict to send(). @@ -720,7 +720,7 @@ async def ping(self, data: Optional[Data] = None) -> asyncio.Future: (which will be encoded to UTF-8) or a bytes-like object. """ async with self.conn_mutex: - if self.connection.state in (CLOSED, CLOSING): + if self.ws_proto.state in (CLOSED, CLOSING): raise WebsocketClosed( "Cannot send a ping when the websocket interface " "is closed." @@ -748,8 +748,8 @@ async def ping(self, data: Optional[Data] = None) -> asyncio.Future: self.pings[data] = self.io_proto.loop.create_future() - self.connection.send_ping(data) - await self.send_data(self.connection.data_to_send()) + self.ws_proto.send_ping(data) + await self.send_data(self.ws_proto.data_to_send()) return asyncio.shield(self.pings[data]) @@ -761,15 +761,15 @@ async def pong(self, data: Data = b"") -> None: be a string (which will be encoded to UTF-8) or a bytes-like object. """ async with self.conn_mutex: - if self.connection.state in (CLOSED, CLOSING): + if self.ws_proto.state in (CLOSED, CLOSING): # Cannot send pong after transport is shutting down return if isinstance(data, str): data = data.encode("utf-8") elif isinstance(data, (bytearray, memoryview)): data = bytes(data) - self.connection.send_pong(data) - await self.send_data(self.connection.data_to_send()) + self.ws_proto.send_pong(data) + await self.send_data(self.ws_proto.data_to_send()) async def send_data(self, data_to_send): for data in data_to_send: @@ -791,7 +791,7 @@ async def send_data(self, data_to_send): SanicProtocol.close(self.io_proto, timeout=1.0) async def async_data_received(self, data_to_send, events_to_process): - if self.connection.state in (OPEN, CLOSING) and len(data_to_send) > 0: + if self.ws_proto.state in (OPEN, CLOSING) and len(data_to_send) > 0: # receiving data can generate data to send (eg, pong for a ping) # send connection.data_to_send() await self.send_data(data_to_send) @@ -799,9 +799,9 @@ async def async_data_received(self, data_to_send, events_to_process): await self.process_events(events_to_process) def data_received(self, data): - self.connection.receive_data(data) - data_to_send = self.connection.data_to_send() - events_to_process = self.connection.events_received() + self.ws_proto.receive_data(data) + data_to_send = self.ws_proto.data_to_send() + events_to_process = self.ws_proto.events_received() if len(data_to_send) > 0 or len(events_to_process) > 0: asyncio.create_task( self.async_data_received(data_to_send, events_to_process) @@ -810,7 +810,7 @@ def data_received(self, data): async def async_eof_received(self, data_to_send, events_to_process): # receiving EOF can generate data to send # send connection.data_to_send() - if self.connection.state in (OPEN, CLOSING) and len(data_to_send) > 0: + if self.ws_proto.state in (OPEN, CLOSING) and len(data_to_send) > 0: await self.send_data(data_to_send) if len(events_to_process) > 0: await self.process_events(events_to_process) @@ -830,9 +830,9 @@ async def async_eof_received(self, data_to_send, events_to_process): SanicProtocol.close(self.io_proto, timeout=1.0) def eof_received(self) -> Optional[bool]: - self.connection.receive_eof() - data_to_send = self.connection.data_to_send() - events_to_process = self.connection.events_received() + self.ws_proto.receive_eof() + data_to_send = self.ws_proto.data_to_send() + events_to_process = self.ws_proto.events_received() asyncio.create_task( self.async_eof_received(data_to_send, events_to_process) ) @@ -842,11 +842,11 @@ def connection_lost(self, exc): """ The WebSocket Connection is Closed. """ - if not self.connection.state == CLOSED: + if not self.ws_proto.state == CLOSED: # signal to the websocket connection handler # we've lost the connection - self.connection.fail(code=1006) - self.connection.state = CLOSED + self.ws_proto.fail(code=1006) + self.ws_proto.state = CLOSED self.abort_pings() if self.connection_lost_waiter: