Python Async SignalR DotNetCore Client
This library implements the SignalR JSON protocol using asyncio. Note: Streaming is currently not supported.
from async_signalr_client.connection import Connection
async def main():
# This will create the connection instance but will not launch any connection
connection = Connection("ws://127.0.0.1:5000/chat")
# This will establish the connection and negotiate protocol
await connection.start()
Connection Parameters:
- url --> Url of the hub (Note: use ws(s) scheme for Websocket Transport)
- transport -->
client.transport.WebSocketTransport
(Default) orclient.transport.LongPollingTransport
- protocol -->
client.protocol.JsonProtocol()
(Default) - connection_timeout --> Timeout connection when no ping is received for the given interval in seconds
- log_level --> Standard library LogLevel
from async_signalr_client.connection import Connection
async def main():
connection = Connection("ws://127.0.0.1:5000/chat")
await connection.start()
# Sends a message to target foo with 1 string parameter
await connection.invoke("foo", "bar")
from async_signalr_client.connection import Connection
from async_signalr_client.exceptions import SignalRCompletionServerError
async def main():
connection = Connection("ws://127.0.0.1:5000/chat")
await connection.start()
completion = await connection.invoke("foo", "bar")
# ... perform other tasks ...
try:
# Wait until completion is received
# Retrieve completion payload
await completion
# Note: If completions returns an error "SignalRCompletionServerError" will be raised
print(completion.result())
except SignalRCompletionServerError as e:
print(e)
import asyncio
from async_signalr_client.connection import Connection
class ChatClient(Connection):
async def activity(self):
await self.start()
# Using the prefix "on_" the handler is automatically added
async def on_foo(self, bar):
print(f"Received Foo: bar={bar}")
async def test(bar):
print(f"Received Foo: bar={bar}")
connection = ChatClient("ws://127.0.0.1:5000/chat")
# Register a handler for foo
connection.on("foo", test)
loop = asyncio.get_event_loop()
loop.create_task(connection.activity())
loop.run_forever()