Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Question] Detecting closed web sockets for subscriptions #528

Closed
erikdrewdunning opened this issue Nov 8, 2022 · 4 comments
Closed

[Question] Detecting closed web sockets for subscriptions #528

erikdrewdunning opened this issue Nov 8, 2022 · 4 comments
Labels
status: superseded Issue is superseded by another

Comments

@erikdrewdunning
Copy link

We are creating a new API and would like to push Order events to our partners via graphql subscriptions.
I am new to reactive streams but have come up with a basic concept to retrieve order updates from our database and publish them to a flux with a generator and send them out a graphql subscription like so:

 @SubscriptionMapping
    public Flux<Order> orders(@Argument Long id) {
        Flux<Order> flux = Flux.concat(Flux.generate(() -> id, (orderId, synchronousSink) -> {
            List<Order> orders = getNewOrdersFromDB(orderId);
            Flux<Order> orderFlux = Flux.fromIterable(orders);
            synchronousSink.next(orderFlux);
            if(orders.isEmpty())
                return orderId;
            else
                return orders.get(orders.size()-1).getId();
        }));
        return flux;
    }

Testing this with graphiql I see two problems when I close the window and close the websocket:

  1. This exception is thrown when a new Order is sent to the Sink after the connection is closed:
2022-11-08 15:18:47.195 ERROR 33912 --- [-67e73587779f-1] o.s.g.s.webmvc.GraphQlWebSocketHandler   : Closing session due to exception for StandardWebSocketSession[id=e57e5bc6-445e-bbd7-d058-67e73587779f, uri=ws://localhost:8080/graphql]

java.io.IOException: java.io.IOException: Broken pipe
	at org.apache.tomcat.websocket.WsRemoteEndpointImplBase.sendMessageBlock(WsRemoteEndpointImplBase.java:327) ~[tomcat-embed-websocket-9.0.68.jar:9.0.68]
	at org.apache.tomcat.websocket.WsRemoteEndpointImplBase.sendMessageBlock(WsRemoteEndpointImplBase.java:254) ~[tomcat-embed-websocket-9.0.68.jar:9.0.68]
	at org.apache.tomcat.websocket.WsRemoteEndpointImplBase.sendPartialString(WsRemoteEndpointImplBase.java:227) ~[tomcat-embed-websocket-9.0.68.jar:9.0.68]
	at org.apache.tomcat.websocket.WsRemoteEndpointBasic.sendText(WsRemoteEndpointBasic.java:49) ~[tomcat-embed-websocket-9.0.68.jar:9.0.68]
	at org.springframework.web.socket.adapter.standard.StandardWebSocketSession.sendTextMessage(StandardWebSocketSession.java:215) ~[spring-websocket-5.3.23.jar:5.3.23]
	at org.springframework.web.socket.adapter.AbstractWebSocketSession.sendMessage(AbstractWebSocketSession.java:106) ~[spring-websocket-5.3.23.jar:5.3.23]
	at org.springframework.graphql.server.webmvc.GraphQlWebSocketHandler$SendMessageSubscriber.hookOnNext(GraphQlWebSocketHandler.java:562) ~[spring-graphql-1.0.2.jar:1.0.2]
	at org.springframework.graphql.server.webmvc.GraphQlWebSocketHandler$SendMessageSubscriber.hookOnNext(GraphQlWebSocketHandler.java:540) ~[spring-graphql-1.0.2.jar:1.0.2]
	at reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160) ~[reactor-core-3.4.24.jar:3.4.24]
	at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440) ~[reactor-core-3.4.24.jar:3.4.24]
	at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527) ~[reactor-core-3.4.24.jar:3.4.24]
	at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84) ~[reactor-core-3.4.24.jar:3.4.24]
	at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37) ~[reactor-core-3.4.24.jar:3.4.24]
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Caused by: java.io.IOException: Broken pipe
	at java.base/sun.nio.ch.FileDispatcherImpl.writev0(Native Method) ~[na:na]
	at java.base/sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:66) ~[na:na]
	at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:227) ~[na:na]
	at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:158) ~[na:na]
	at java.base/sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:563) ~[na:na]
	at org.apache.tomcat.util.net.NioChannel.write(NioChannel.java:147) ~[tomcat-embed-core-9.0.68.jar:9.0.68]
	at org.apache.tomcat.util.net.NioEndpoint$NioSocketWrapper$NioOperationState.run(NioEndpoint.java:1680) ~[tomcat-embed-core-9.0.68.jar:9.0.68]
	at org.apache.tomcat.util.net.SocketWrapperBase$OperationState.start(SocketWrapperBase.java:1070) ~[tomcat-embed-core-9.0.68.jar:9.0.68]
	at org.apache.tomcat.util.net.SocketWrapperBase.vectoredOperation(SocketWrapperBase.java:1489) ~[tomcat-embed-core-9.0.68.jar:9.0.68]
	at org.apache.tomcat.util.net.SocketWrapperBase.write(SocketWrapperBase.java:1415) ~[tomcat-embed-core-9.0.68.jar:9.0.68]
	at org.apache.tomcat.util.net.SocketWrapperBase.write(SocketWrapperBase.java:1386) ~[tomcat-embed-core-9.0.68.jar:9.0.68]
	at org.apache.tomcat.websocket.server.WsRemoteEndpointImplServer.doWrite(WsRemoteEndpointImplServer.java:93) ~[tomcat-embed-websocket-9.0.68.jar:9.0.68]
	at org.apache.tomcat.websocket.WsRemoteEndpointImplBase.writeMessagePart(WsRemoteEndpointImplBase.java:512) ~[tomcat-embed-websocket-9.0.68.jar:9.0.68]
	at org.apache.tomcat.websocket.WsRemoteEndpointImplBase.sendMessageBlock(WsRemoteEndpointImplBase.java:314) ~[tomcat-embed-websocket-9.0.68.jar:9.0.68]
	... 17 common frames omitted
  1. Secondly if no orders are retrieved from the DB and published to the flux after graphiql is closed, no closed or complete signal seems to be sent to the generator and it just keeps running, until an order is sent to the Sink, in which case you get the above exceptions.

So my question: is there a way for the server to periodically ping the client through the websocket and shut down the Flux generator in a graceful manner if the connection is closed?
It seems that in the graphqlws spec provides a message type for 'ping' and 'pong' messages, see https://github.com/enisdenjo/graphql-ws/blob/master/PROTOCOL.md#ping, maybe the time interval of these could be configurable, or if no messages have been received in certain time period a ping could be sent.
Thanks, for any and all advice or info you can provide.
The sample project that can reproduce this behavior can be found here: https://github.com/erikdrewdunning/subscription-demo
Also here is a simple graphql subscription request that works against the server:

subscription {
  orders(id: "1") {
      id
  }
}
@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged label Nov 8, 2022
@rstoyanchev
Copy link
Contributor

There is currently no support for sending PING messages from the server. We could add a feature along those lines, either to configure a Flux or how frequently to send them. That said, GraphQlWebSocketHandler does respond to PING messages with a PONG, so if the client library has the the feature to send them, you can try turning it on from that side.

For the "Broken pipe", we have support in the Spring Framework to treat such exceptions as unnecessary noise (normal case of client disconnecting) and only log them with a special logger at DEBUG without a stacktrace, and at TRACE with the full stacktrace. That means in production you won't see them. Is this what you're looking for? If yes, we can make this change, but it will likely need to be in the spring-websocket module, i.e. on the Spring Framework side rather than here.

For detecting lost connection or cancelled subscription, you can add doOnCancel after Flux.generate and that should help to detect when the Flux generator can stop producing. There is also a more global WebSocketGraphQlInterceptor that lets you be notified when a connection is closed, and/or when a subscription is cancelled. Both of these, however, relate to the earlier topic of when the server actually detects that the connection is closed, and therefore to sending PING messages.

As an aside, I simplified your method a bit like this:

@SubscriptionMapping
public Flux<Order> orders(@Argument Long id) {
    return Flux.generate(() -> id, (lastOrderId, sink) -> {
        List<Order> orders = getNewOrdersFromDB(lastOrderId);
        orders.forEach(sink::next);
        return (!orders.isEmpty() ? getLastOrderId(orders) : lastOrderId);
    });
}

@rstoyanchev rstoyanchev added the status: waiting-for-feedback We need additional information before we can continue label Nov 9, 2022
@erikdrewdunning
Copy link
Author

Thanks for the quick response, I really appreciate it!

I think it would be very useful to add the ability to send pings from the server. We are currently testing our subscription server in a ECS container in AWS, and the load balancer is closing the websocket connection if it is idle longer than the idle timeout, which has a maximum setting or 4000 seconds or 66 minutes. Having the server send a ping periodically would keep the connection from going idle thus prohibiting our load balancer from closing the connection during periods of no activity. Having the ability to send pings from the server would help with idle time outs in the load balancer and quicker detection of closed connections. It would be nice to specify an interval to constantly send pings on, or to specify an idle time to send a ping if a period of no activity on the connection was detected.

It looks like Apollo Server actually has a feature like this called keepAlive, see the documentation here: https://www.apollographql.com/docs/apollo-server/v2/api/apollo-server/

keepAlive The frequency with which the subscription endpoint should send keep-alive messages to open client connections, in milliseconds, if any.

What is the process to request a keepAlive feature be added to your backlog?

Thanks for the code suggestion, I also found my code to be a bit complicated, but the reason it is complicated is because a generator only allows 1 Sink.next call per generator run. Your code works in my simplified example where I'm just returning 1 Order from the DB, but unfortunately does not work when multiple orders are returned from the DB.

orders.forEach(sink::next);

The only way I could figure out how to send multiple elements, in this case Orders, in one generator run was to add them all to a new Flux, then Sink.next that Flux, and then finally call Flux.concat on the generator itself to "flatten" back out the flux returned by the SubscriptionMapping method. This approach also allows me to call Sink.next when no Orders are returned from the DB as well. I'm afraid I am not fully understanding Reactive Streams though, and am actually surprised my solution worked at all. Also I was wondering if the Flux.concat call might be interfering with the downstream signal requesting more Orders?

@spring-projects-issues spring-projects-issues added status: feedback-provided Feedback has been provided and removed status: waiting-for-feedback We need additional information before we can continue labels Nov 10, 2022
@rstoyanchev
Copy link
Contributor

rstoyanchev commented Nov 14, 2022

Thanks for the extra details. I've created #534 to add support for server pings.

You're right that SynchronousSink is one item at a time. Start by passing the list, i.e. sink.next(orders). Then you can use flatMapIterable if you want to send each updated order as an individual event, or alternatively, you can model it as a list of updates per event, which would reduce the overall number of messages sent:

type Subscription {
    orders(id: ID): [Order!]
}

Yet another option to consider is to use https://r2dbc.io/.

@rstoyanchev
Copy link
Contributor

Closing for now as superseded by #534 but feel free to comment.

@rstoyanchev rstoyanchev closed this as not planned Won't fix, can't repro, duplicate, stale Nov 16, 2022
@rstoyanchev rstoyanchev added status: superseded Issue is superseded by another and removed status: feedback-provided Feedback has been provided status: waiting-for-triage An issue we've not yet triaged labels Nov 16, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status: superseded Issue is superseded by another
Projects
None yet
Development

No branches or pull requests

3 participants