-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
#1008: Added Q.WATCH support to Websocket #1090
Conversation
@lucifercr07 @pratikpandey21 - Added Qwatch to Websocket. Please review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd recommend waiting for #1086
and leverage the code there as that would enable multithreading in WS.
Will review once those changes are there
for { | ||
// read incoming message | ||
_, msg, err := conn.ReadMessage() | ||
if err != nil { | ||
writeResponse(conn, []byte("error: command reading failed")) | ||
continue | ||
break | ||
} | ||
|
||
// parse message to dice command | ||
diceDBCmd, err := utils.ParseWebsocketMessage(msg) | ||
if errors.Is(err, diceerrors.ErrEmptyCommand) { | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we log a warn here? Or return an error to client. As client is just receiving an empty response in this case which doesn't seem to be correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a warning here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@psrvere not able to see the log statement added here, is commit pushed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lucifercr07 - the warning was added as part of #1142 - please see here
That was important for integration tests to work so had to move to that PR.
"-2", // Represents a RESP Integer with value -2. | ||
"*0", // Represents an empty RESP Array. | ||
// start a goroutine for subsequent updates | ||
go s.processQwatchUpdates(clientIdentifierID, conn, diceDBCmd) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we refactor this such that we don't spawn new go routine for each qwatch cmd subscription?
We can create another channel at start for QWATCH and keep registering clients to it and handle all responses in a single event loop.
This can also be taken as enhancement on top of this PR, not a blocker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is absolutely critical. I realised this while writing tests with multiple subscribers. Have created a follow up issue #1153
} | ||
case <-s.shutdownChan: | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we explicitly send qunwatch
command from here and de-register the client?
Currently the cleanup happens after the key is triggered with a error message as websocket closed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Let me also figure out how q.unwatch will work in websocket. Will address this comment as part of #1154
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
func TestQWatch(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add a test where multiple subscribers are there for query and responses are triggered for them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the test but with single subscriber right now. As part of #1153, I will increase the number of subscribers to 5 (arbitrary)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test also will come as part of #1153. Currently previous tests leave a subscriber which also receive response when value of a key is updated. So it become multi subscriber problem again.
@lucifercr07 - have addressed all comments and created new issues where needed. Please review again. |
for { | ||
// read incoming message | ||
_, msg, err := conn.ReadMessage() | ||
if err != nil { | ||
writeResponse(conn, []byte("error: command reading failed")) | ||
continue | ||
break | ||
} | ||
|
||
// parse message to dice command | ||
diceDBCmd, err := utils.ParseWebsocketMessage(msg) | ||
if errors.Is(err, diceerrors.ErrEmptyCommand) { | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@psrvere not able to see the log statement added here, is commit pushed?
@lucifercr07 - merging with latest master and resolving a few conflicts. |
@lucifercr07 - good to go, please check. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Closes #1008
This PR
a. updates
WebsocketServer
struct to useioChan
andqwatchResponseChan
b. extracts out response processing logic to a function
c. adds a function to process Q.WATCH updates
ParseWebsocketMessage
to support Q.WATCH command