- Django Channels based WebSocket GraphQL server with Graphene-like subscriptions
- WebSocket-based GraphQL server implemented on the Django Channels v2.
- WebSocket protocol is compatible with Apollo GraphQL client.
- Graphene-like subscriptions.
- All GraphQL requests are processed concurrently (in parallel).
- Subscription notifications delivered in the order they were issued.
- Optional subscription activation message can be sent to a client. This is useful to avoid race conditions on the client side. Consider the case when client subscribes to some subscription and immediately invokes a mutations which triggers this subscription. In such case the subscription notification can be lost, cause these subscription and mutation requests are processed concurrently. To avoid this client shall wait for the subscription activation message before sending such mutation request.
- Customizable notification strategies:
- A subscription can be put to one or many subscription groups. This allows to granularly notify only selected clients, or, looking from the client's perspective - to subscribe to some selected source of events. For example, imaginary subscription "OnNewMessage" may accept argument "user" so subscription will only trigger on new messages from the selected user.
- Notification can be suppressed in the subscription resolver method
publish
. For example, this is useful to avoid sending self-notifications.
- All GraphQL "resolvers" run in a threadpool so they never block the server itself and may communicate with database or perform other blocking tasks.
- Resolvers (including subscription's
subscribe
&publish
) can be represented both as synchronous or asynchronous (async def
) methods. - Subscription notifications can be sent from both synchronous and
asynchronous contexts. Just call
MySubscription.broadcast()
orawait MySubscription.broadcast()
depending on the context. - Clients for the GraphQL WebSocket server:
- AIOHTTP-based client.
- Client for unit test based on the Django Channels testing communicator.
- Supported Python 3.6 and newer (tests run on 3.6, 3.7, and 3.8).
- Works on Linux, macOS, and Windows.
pip install django-channels-graphql-ws
Create a GraphQL schema using Graphene. Note the MySubscription
class.
import channels_graphql_ws
import graphene
class MySubscription(channels_graphql_ws.Subscription):
"""Simple GraphQL subscription."""
# Leave only latest 64 messages in the server queue.
notification_queue_limit = 64
# Subscription payload.
event = graphene.String()
class Arguments:
"""That is how subscription arguments are defined."""
arg1 = graphene.String()
arg2 = graphene.String()
@staticmethod
def subscribe(root, info, arg1, arg2):
"""Called when user subscribes."""
# Return the list of subscription group names.
return ["group42"]
@staticmethod
def publish(payload, info, arg1, arg2):
"""Called to notify the client."""
# Here `payload` contains the `payload` from the `broadcast()`
# invocation (see below). You can return `MySubscription.SKIP`
# if you wish to suppress the notification to a particular
# client. For example, this allows to avoid notifications for
# the actions made by this particular client.
return MySubscription(event="Something has happened!")
class Query(graphene.ObjectType):
"""Root GraphQL query."""
# Check Graphene docs to see how to define queries.
pass
class Mutation(graphene.ObjectType):
"""Root GraphQL mutation."""
# Check Graphene docs to see how to define mutations.
pass
class Subscription(graphene.ObjectType):
"""Root GraphQL subscription."""
my_subscription = MySubscription.Field()
graphql_schema = graphene.Schema(
query=Query,
mutation=Mutation,
subscription=Subscription,
)
Make your own WebSocket consumer subclass and set the schema it serves:
class MyGraphqlWsConsumer(channels_graphql_ws.GraphqlWsConsumer):
"""Channels WebSocket consumer which provides GraphQL API."""
schema = graphql_schema
# Uncomment to send keepalive message every 42 seconds.
# send_keepalive_every = 42
# Uncomment to process requests sequentially (useful for tests).
# strict_ordering = True
async def on_connect(self, payload):
"""New client connection handler."""
# You can `raise` from here to reject the connection.
print("New client connected!")
Setup Django Channels routing:
application = channels.routing.ProtocolTypeRouter({
"websocket": channels.routing.URLRouter([
django.urls.path("graphql/", MyGraphqlWsConsumer.as_asgi()),
])
})
Notify﹡ clients when some event happens using
the broadcast()
or broadcast_sync()
method from the OS thread where
there is no running event loop:
MySubscription.broadcast(
# Subscription group to notify clients in.
group="group42",
# Dict delivered to the `publish` method.
payload={},
)
Notify﹡ clients in an coroutine function
using the broadcast()
or broadcast_async()
method:
await MySubscription.broadcast(
# Subscription group to notify clients in.
group="group42",
# Dict delivered to the `publish` method.
payload={},
)
﹡) In case you are testing your client code by notifying it from the Django Shell, you have to setup a channel layer in order for the two instance of your application. The same applies in production with workers.
You can find simple usage example in the example directory.
Run:
cd example/
# Initialize database.
./manage.py migrate
# Create "user" with password "user".
./manage.py createsuperuser
# Run development server.
./manage.py runserver
Play with the API though the GraphiQL browser at http://127.0.0.1:8000.
You can start with the following GraphQL requests:
# Check there are no messages.
query read { history(chatroom: "kittens") { chatroom text sender }}
# Send a message as Anonymous.
mutation send { sendChatMessage(chatroom: "kittens", text: "Hi all!"){ ok }}
# Check there is a message from `Anonymous`.
query read { history(chatroom: "kittens") { text sender } }
# Login as `user`.
mutation send { login(username: "user", password: "pass") { ok } }
# Send a message as a `user`.
mutation send { sendChatMessage(chatroom: "kittens", text: "It is me!"){ ok }}
# Check there is a message from both `Anonymous` and from `user`.
query read { history(chatroom: "kittens") { text sender } }
# Subscribe, do this from a separate browser tab, it waits for events.
subscription s { onNewChatMessage(chatroom: "kittens") { text sender }}
# Send something again to check subscription triggers.
mutation send { sendChatMessage(chatroom: "kittens", text: "Something ;-)!"){ ok }}
The channels_graphql_ws
module provides the following key classes:
GraphqlWsConsumer
: Django Channels WebSocket consumer which maintains WebSocket connection with the client.Subscription
: Subclass this to define GraphQL subscription. Very similar to defining mutations with Graphene. (The class itself is a "creative" copy of the GrapheneMutation
class.)GraphqlWsClient
: A client for the GraphQL backend. Executes strings with queries and receives subscription notifications.GraphqlWsTransport
: WebSocket transport interface for the client.GraphqlWsTransportAiohttp
: WebSocket transport implemented on the AIOHTTP library.
For details check the source code which is thoroughly commented. The docstrings of classes are especially useful.
Since the WebSocket handling is based on the Django Channels and subscriptions are implemented in the Graphene-like style it is recommended to have a look the documentation of these great projects:
The implemented WebSocket-based protocol was taken from the library subscription-transport-ws which is used by the Apollo GraphQL. Check the protocol description for details.
The Subscription.broadcast
uses Channels groups to deliver a message
to the Subscription
's publish
method.
ASGI specification
clearly states what can be sent over a channel, and Django models are
not in the list. Since it is common to notify clients about Django
models changes we manually serialize the payload
using
MessagePack
and hack the process to automatically serialize Django models following
the the Django's guide
Serializing Django objects.
- Different requests from different WebSocket client are processed asynchronously.
- By default different requests (WebSocket messages) from a single
client are processed concurrently in different worker threads. (It is
possible to change the maximum number of worker threads with the
max_worker_threads
setting.) So there is no guarantee that requests will be processed in the same the client sent these requests. Actually, with HTTP we have this behavior for decades. - It is possible to serialize message processing by setting
strict_ordering
toTrue
. But note, this disables parallel requests execution - in other words, the server will not start processing another request from the client before it finishes the current one. See comments in the classGraphqlWsConsumer
. - All subscription notifications are delivered in the order they were issued.
The context object (info.context
in resolvers) is an object-like
wrapper around Channels
scope
typically available as self.scope
in the Channels consumers. So you
can access Channels scope as info.context
. Modifications made in
info.context
are stored in the Channels scope, so they are persisted
as long as WebSocket connection lives. You can work with info.context
both as with dict
or as with SimpleNamespace
:
def resolve_something(self, info):
info.context.fortytwo = 42
assert info.context["fortytwo"] == 42
To enable authentication it is typically enough to wrap your ASGI
application into the channels.auth.AuthMiddlewareStack
:
application = channels.routing.ProtocolTypeRouter({
"websocket": channels.auth.AuthMiddlewareStack(
channels.routing.URLRouter([
django.urls.path("graphql/", MyGraphqlWsConsumer),
])
),
})
This gives you a Django user info.context.user
in all the
resolvers. To authenticate user you can create a Login
mutation like
the following:
class Login(graphene.Mutation, name="LoginPayload"):
"""Login mutation."""
ok = graphene.Boolean(required=True)
class Arguments:
"""Login request arguments."""
username = graphene.String(required=True)
password = graphene.String(required=True)
def mutate(self, info, username, password):
"""Login request."""
# Ask Django to authenticate user.
user = django.contrib.auth.authenticate(username=username, password=password)
if user is None:
return Login(ok=False)
# Use Channels to login, in other words to put proper data to
# the session stored in the scope. The `info.context` is
# practically just a wrapper around Channel `self.scope`, but
# the `login` method requires dict, so use `_asdict`.
asgiref.sync.async_to_sync(channels.auth.login)(info.context._asdict(), user)
# Save the session,cause `channels.auth.login` does not do this.
info.context.session.save()
return Login(ok=True)
The authentication is based on the Channels authentication mechanisms. Check the Channels documentation. Also take a look at the example in the example directory.
There is the GraphqlWsClient
which implements GraphQL client working
over the WebSockets. The client needs a transport instance which
communicates with the server. Transport is an implementation of the
GraphqlWsTransport
interface (class must be derived from it). There is
the GraphqlWsTransportAiohttp
which implements the transport on the
AIOHTTP library. Here is an
example:
transport = channels_graphql_ws.GraphqlWsTransportAiohttp(
"ws://backend.endpoint/graphql/", cookies={"sessionid": session_id}
)
client = channels_graphql_ws.GraphqlWsClient(transport)
await client.connect_and_init()
result = await client.execute("query { users { id login email name } }")
users = result["data"]
await client.finalize()
See the GraphqlWsClient
class docstring for the details.
The GraphiQL provided by Graphene doesn't connect to your GraphQL
endpoint via WebSocket ; instead you should use a modified GraphiQL
template under graphene/graphiql.html
which will take precedence over
the one of Graphene. One such modified GraphiQL is provided in the
example directory.
To test GraphQL WebSocket API read the appropriate page in the Channels documentation.
In order to simplify unit testing there is a GraphqlWsTransport
implementation based on the Django Channels testing communicator:
channels_graphql_ws.testing.GraphqlWsTransport
. Check its docstring
and take a look at the tests to see how to use it.
The original Apollo's protocol does not allow client to know when a
subscription activates. This inevitably leads to the race conditions on
the client side. Sometimes it is not that crucial, but there are cases
when this leads to serious issues.
Here is the discussion
in the
subscriptions-transport-ws
tracker.
To solve this problem, there is the GraphqlWsConsumer
setting
confirm_subscriptions
which when set to True
will make the consumer
issue an additional data
message which confirms the subscription
activation. Please note, you have to modify the client's code to make it
consume this message, otherwise it will be mistakenly considered as the
first subscription notification.
To customize the confirmation message itself set the GraphqlWsConsumer
setting subscription_confirmation_message
. It must be a dictionary
with two keys "data"
and "errors"
. By default it is set to
{"data": None, "errors": None}
.
It is possible to inject middleware into the GraphQL operation
processing. For that define middleware
setting of your
GraphqlWsConsumer
subclass, like this:
def my_middleware(next_middleware, root, info, *args, **kwds):
"""My custom GraphQL middleware."""
# Invoke next middleware.
return next_middleware(root, info, *args, **kwds)
class MyGraphqlWsConsumer(channels_graphql_ws.GraphqlWsConsumer):
...
middleware = [my_middleware]
For more information about GraphQL middleware please take a look at the relevant section in the Graphene documentation.
There is a Tomáš Ehrlich GitHubGist GraphQL Subscription with django-channels which this implementation was initially based on.
There is a promising GraphQL WS library by the Graphene authors. In particular this pull request gives a hope that there will be native Graphene implementation of the WebSocket transport with subscriptions one day.
A reminder of how to setup an environment for the development.
- Install PyEnv to be able to work with many Python versions at once PyEnv→Installation.
- Install Python versions needed. The command should be executed in the project's directory:
$ pyenv local | xargs -L1 pyenv install
- Check that pyenv works correctly. The command:
should show python versions enlisted in .python-version. If everything is set up correctly pyenv will switch version of python when you enter and leave the project's directory. Inside the directory
$ pyenv versions
pyenv which python
should show you a python installed in pyenv, outside the dir it should be the system python. - Install Poetry to the system Python.
It is important to install Poetry into the system Python, NOT in your virtual environment. For details see Poetry docs: https://python-poetry.org/docs/#installation
$ curl -sSL https://raw.githubusercontent.com/python-poetry/poetry/master/get-poetry.py | python -
- Create local virtualenv in
.venv
, install all project dependencies (frompyproject.toml
) except the project itself.$ poetry install --no-root
- Activate virtualenv
There are options:
- With Poetry:
$ poetry shell
- Manually:
$ source .venv/bin/activate
- With VS Code: Choose
.venv
with "Python: Select interpreter" and reopen the terminal.
- With Poetry:
- Upgrade Pip:
$ pip install --upgrade pip
- Install pre-commit hooks to check code style automatically:
$ pre-commit install
Use:
A reminder of how to run tests.
- Run all tests on all supported Python versions:
$ tox
- Run all tests on a single Python version, e.g on Python 3.7:
$ tox -e py37
- Example of running a single test:
$ tox -e py36 -- tests/test_basic.py::test_main_usecase
- Running on currently active Python directly with Pytest:
$ poetry run pytest
A reminder of how to make and publish a new release.
- Merge all changes to the master branch and switch to it.
- Update version:
poetry version minor
. - Update CHANGELOG.md.
- Update README.md (if needed).
- Commit changes made above.
- Git tag:
git tag vX.X.X && git push --tags
. - Publish release to PyPI:
poetry publish --build
. - Update release notes on GitHub.
This project is developed and maintained by DATADVANCE LLC. Please submit an issue if you have any questions or want to suggest an improvement.
This work is supported by the Russian Foundation for Basic Research (project No. 15-29-07043).