Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle Q.WATCH updates in a single event loop for Websockets #1175

Open
wants to merge 15 commits into
base: master
Choose a base branch
from

Conversation

psrvere
Copy link
Contributor

@psrvere psrvere commented Oct 21, 2024

Closes #1153

In PR #1090 Q.WATCH updates were handled by processQwatchUpdates run in a goroutine. Based on this review comment, this PR moves this handling to a single event loop. It

  • adds subscriptionChan and subscribedClients to WebsocketServer
  • runs listenForSubscriptions and processQwatchUpdates as part of WebsocketServer.Run()
  • shutsdown above two goroutines through both shutdownChan (called by abort command) and ctx.Done channel

This PR also

  • adds ReadResponse function in setup.go to read Q.WATCH updates
  • adds two more integration tests TestQWatchWithMultipleClients and TestQWatchWithMultipleClientsAndQueries
  • changes key names of all tests in qwatch_test.go to be unique so that there is no interference with other tests. Since cancelling subscription (Q.UNWATCH) is not supported yet, any following tests that changes the key also triggers Q.WATCH updates. This is a known issue and will be fixed as part of Add Q.UNWATCH for websocket #1154

This PR also fixes following bugs:

  • Data races accessing conn.WriteMessage. Fix: added a mutex
  • Any error in processing Q.WATCH updates resulted in returning processQwatchUpdates goroutine. Fix: It skips update when it can't process it and logs error.
  • RunWebsocketServer in setup.go was relying on both cancelling parent context to shutdown queryManager goroutine and abort command to shutdown shardManager. Fix: Made it consistent - everythings is gracefully shutdown by abort command

@psrvere psrvere marked this pull request as ready for review October 21, 2024 10:50
@psrvere
Copy link
Contributor Author

psrvere commented Oct 21, 2024

@lucifercr07 - Please review.

@psrvere psrvere changed the title Handle Q.WATCH Updates in a single event loop for Websockets Handle Q.WATCH updates in a single event loop for Websockets Oct 22, 2024
internal/server/websocketServer.go Outdated Show resolved Hide resolved
internal/server/websocketServer.go Outdated Show resolved Hide resolved
internal/server/websocketServer.go Show resolved Hide resolved
internal/server/websocketServer.go Show resolved Hide resolved
@arpit1912
Copy link

Have we done any benchmarking for the Q.Watch and do we want to check if we are not regressing anything with the above change?

@JyotinderSingh
Copy link
Collaborator

Hey @psrvere could you please resolve the conflicts?

@psrvere
Copy link
Contributor Author

psrvere commented Nov 11, 2024

@JyotinderSingh - Resolved. Please review.

@JyotinderSingh JyotinderSingh requested review from JyotinderSingh and removed request for arpit1912 November 11, 2024 23:22
if err != nil && !errors.Is(err, http.ErrServerClosed) {
slog.Error("error while listenting on WebSocket", slog.Any("error", err))
}
s.listenForSubscriptions(wsCtx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies if I'm missing something here, why do we need a separate go routine to listen for QWATCH subscriptions? Can't it be handled as part of websocketServer.ListenAndServe()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lucifercr07 - ListenAndServe receives subscription request which is then handled by WebsocketHandler. It sends subscription event on subscriptionChan and a dedicated goroutine listenForSubscriptions receives the event and subscribes it.

Are you suggesting that we move the work done by listenForSubscriptions in WebsocketHandler?

I created it as a separate goroutine to separate the concern. Do you see any disadvantage in this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be we can let WebsocketHandler do the subscription itself, it won't be an expensive/long-running op.
We've already separated processQwatchUpdates to a separate go-routine.
Commands other than .watch wouldn't be long-running generally and I feel a single go-routine would be enough to handle the clients.
Unless we see some latency issue may be we can avoid creating new go-routines.
Please let me know if I'm missing something here.

internal/server/websocketServer.go Outdated Show resolved Hide resolved
internal/server/websocketServer.go Outdated Show resolved Hide resolved
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Handle Q.WATCH updates in a single event loop for Websockets
4 participants