-
-
Notifications
You must be signed in to change notification settings - Fork 552
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Extenstions support for subscriptions. #2097
Comments
In my project, I also need a symmetrical interface of extensions between
Due to an open issue #1864 I will not make a pull request with my solution. Module structure
# subscribe.py
from typing import (
AsyncGenerator,
Sequence,
Type,
Union,
)
from strawberry.extensions import Extension
from strawberry.schema.exceptions import InvalidOperationTypeError
from strawberry.schema.execute import _run_validation
from strawberry.types.graphql import OperationType
from strawberry.types import ExecutionContext
from graphql.subscription import subscribe as original_subscribe
from graphql import (
parse,
GraphQLSchema,
GraphQLError,
ExecutionResult as GraphQLExecutionResult,
)
from strawberry.extensions.runner import ExtensionsRunner
async def subscribe(
schema: GraphQLSchema,
query: str,
*,
extensions: Sequence[Union[Type[Extension], Extension]],
execution_context: ExecutionContext,
) -> AsyncGenerator[GraphQLExecutionResult, None]:
extensions_runner = ExtensionsRunner(
execution_context=execution_context,
extensions=list(extensions),
)
async with extensions_runner.request():
async with extensions_runner.parsing():
if not execution_context.graphql_document:
try:
execution_context.graphql_document = parse(query)
except Exception as err:
error = GraphQLError(str(err), original_error=err) if not isinstance(err, GraphQLError) else err
execution_context.errors = [error]
yield GraphQLExecutionResult(
data=None,
errors=[error],
extensions=await extensions_runner.get_extensions_results(),
)
return
async with extensions_runner.validation():
if execution_context.operation_type is not OperationType.SUBSCRIPTION:
# can be other subscriptions working through the same WS connection
# thats why we are not raising InvalidOperationTypeError as in schema.execute
err = InvalidOperationTypeError(execution_context.operation_type)
execution_context.errors = [GraphQLError(str(err), original_error=err)]
yield GraphQLExecutionResult(data=None, errors=execution_context.errors)
return
_run_validation(execution_context)
if execution_context.errors:
yield GraphQLExecutionResult(data=None, errors=execution_context.errors)
return
async with extensions_runner.executing():
result_source = await original_subscribe(
schema=schema,
document=execution_context.graphql_document,
root_value=execution_context.root_value,
context_value=execution_context.context,
variable_values=execution_context.variables,
operation_name=execution_context.operation_name,
)
if isinstance(result_source, GraphQLExecutionResult):
# An error happen
yield result_source
return
async for exec_result in result_source:
yield exec_result # schema.py
from typing import (
Any,
AsyncGenerator,
Dict,
Optional,
)
import strawberry
from graphql import ExecutionResult as GraphQLExecutionResult
from strawberry.types import ExecutionContext
from .subscribe import subscribe
class Schema(strawberry.Schema):
async def subscribe(
self,
query: str,
variable_values: Optional[Dict[str, Any]] = None,
context_value: Optional[Any] = None,
root_value: Optional[Any] = None,
operation_name: Optional[str] = None,
) -> AsyncGenerator[GraphQLExecutionResult, None]:
# Create execution context
execution_context = ExecutionContext(
query=query,
schema=self,
context=context_value,
root_value=root_value,
variables=variable_values,
provided_operation_name=operation_name,
)
return subscribe(
self._schema,
query,
extensions=self.get_extensions(),
execution_context=execution_context,
) UsageInstead of Schema class from strawberry import our patched version. import asyncio
import strawberry
from strawberry.extensions import Extension
from mydb import get_db_session
from .strawberry_patch.schema import Schema
class MyExtension(Extension):
def on_request_start(self):
self.execution_context.context["db"] = get_db_session()
def on_request_end(self):
self.execution_context.context["db"].close()
@strawberry.type
class Query:
@strawberry.field
def hello(self) -> str:
return "Hello, World"
@strawberry.type
class Subscription:
@strawberry.subscription
async def count(self, target: int = 10) -> AsyncGenerator[int, None]:
for i in range(target):
yield i
await asyncio.sleep(0.5)
SCHEMA = Schema(
query=Query,
subscription=Subscription,
extensions=[MyExtension]
) |
Thank you for that one I've noticed that if we interrupt the Async Generator by stopping the query the following exceptions are thrown:
|
The error is related to changes with the python. You can play around by overwrite the method for your from strawberry.aiohttp.views import (
GraphQLWSHandler,
GraphQLView,
)
class MyGraphQLWSHandler(GraphQLWSHandler):
async def cleanup_operation(self, operation_id: str) -> None:
task_ = self.tasks.pop(operation_id)
task_.cancel()
with suppress(asyncio.CancelledError):
await task_
generator = self.subscriptions.pop(operation_id)
# comment next 2 lines if still get the issue
with suppress(RuntimeError):
generator.aclose()
class MyGraphQLView(GraphQLView):
graphql_ws_handler_class = MyGraphQLWSHandler Or use This is maximum of what I can comment base to provided logs, please also check the answer on stackoverflow related to |
|
@nrbnlulu aggry to you, here also missing extra hooks like: We need to wait for issue #2389 to be completed before thinking of the improvement because at the moment each supported framework integration will require a custom implementation to solve the issue... |
Provide extenstions support for subscriptions.
Feature Request Type
Description
currently with subscriptions won't hook to any of
on_request_start
/end
resolve
get_result
on_validation_start
/end
on_parsing_start
/end
on_executing_start
/end
It would be nice if we can hook it there too
maybe
on_request_start
/end
would hook toconnect
/disconnect
of the application.for the other I am not quite sure..
Upvote & Fund
The text was updated successfully, but these errors were encountered: