Skip to content
This repository has been archived by the owner on Aug 6, 2021. It is now read-only.

Commit

Permalink
Merge pull request #128 from overleaf/jpa-hack-around-process-next-tick
Browse files Browse the repository at this point in the history
[misc] hack around process next tick
  • Loading branch information
das7pad authored Apr 29, 2020
2 parents 8620b52 + 73fcba8 commit 67bbdbe
Show file tree
Hide file tree
Showing 9 changed files with 110 additions and 35 deletions.
2 changes: 2 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[app/coffee/RoomManager.coffee]
indent_style = space
4 changes: 2 additions & 2 deletions app/coffee/DocumentUpdaterController.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ module.exports = DocumentUpdaterController =
(sender || io).to(doc_id).emit "otUpdateApplied", update

_processErrorFromDocumentUpdater: (io, doc_id, error, message) ->
io.to(doc_id).clients (err, clientIds) ->
RoomManager.getClientsInRoomPseudoAsync io, doc_id, (err, clientIds) ->
if err?
return logger.err {room: doc_id, err}, "failed to get room clients"

for client in clientIds.map((id) -> io.sockets.connected[id]) when client # filter disconnected clients
for client in clientIds.map((id) -> io.sockets.connected[id])
logger.warn err: error, doc_id: doc_id, client_id: client.id, "error from document updater, disconnecting client"
client.emit "otUpdateError", error, message
client.disconnect()
Expand Down
21 changes: 19 additions & 2 deletions app/coffee/RoomManager.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,29 @@ module.exports = RoomManager =
metrics.gauge "room-listeners", RoomEvents.eventNames().length

_clientsInRoom: (client, room, cb) ->
client.server.in(room).clients (err, clients) ->
cb(clients?.length || 0)
cb(@getClientsInRoomSync(client.server, room).length)

_roomsClientIsIn: (client) ->
# skip the socket id
return Object.keys(client.rooms).slice(1)

_clientAlreadyInRoom: (client, room) ->
return client.rooms.hasOwnProperty(room)

getClientsInRoomSync: (io, room) ->
# the implementation in socket.io-adapter is prone to race conditions:
# it passes the list of clients via process.nextTick, but given that
# NodeJS can process multiple network events in the same event loop
# cycle, multiple clients may seem to be the last in a room, but
# actually there are not -- some other client joined in the mean time.
# (mean time: calculating the list of clients and us receiving it)
adapter = io.sockets.adapter
return [] unless adapter.rooms.hasOwnProperty(room)
return Object.keys(adapter.rooms[room].sockets).filter((id) ->
return adapter.nsp.connected[id]
)

# HACK: it calls the callback synchronously -- hence the name pseudoAsync
# calling it asynchronously would lead to race conditions -- see above
getClientsInRoomPseudoAsync: (io, room, cb) ->
cb(null, RoomManager.getClientsInRoomSync(io, room))
2 changes: 1 addition & 1 deletion app/coffee/WebsocketController.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ module.exports = WebsocketController =

RoomManager.leaveProjectAndDocs(client)
setTimeout () ->
io.in(project_id).clients (error, remainingClients) ->
RoomManager.getClientsInRoomPseudoAsync io, project_id, (error, remainingClients) ->
if remainingClients.length == 0
# Flush project in the background
DocumentUpdaterManager.flushProjectToMongoAndDelete project_id, (err) ->
Expand Down
7 changes: 3 additions & 4 deletions app/coffee/WebsocketLoadBalancer.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -67,26 +67,25 @@ module.exports = WebsocketLoadBalancer =
if message.room_id == "all"
io.sockets.emit(message.message, message.payload...)
else if message.message is 'clientTracking.refresh' && message.room_id?
io.to(message.room_id).clients (err, clientIds) ->
RoomManager.getClientsInRoomPseudoAsync io, message.room_id, (err, clientIds) ->
if err?
return logger.err {room: message.room_id, err}, "failed to get room clients"
logger.log {channel:channel, message: message.message, room_id: message.room_id, message_id: message._id, socketIoClients: clientIds}, "refreshing client list"
for clientId in clientIds when io.sockets.connected[clientId] # filter disconnected clients
for clientId in clientIds
ConnectedUsersManager.refreshClient(message.room_id, clientId)
else if message.room_id?
if message._id? && Settings.checkEventOrder
status = EventLogger.checkEventOrder("editor-events", message._id, message)
if status is "duplicate"
return # skip duplicate events
io.to(message.room_id).clients (err, clientIds) ->
RoomManager.getClientsInRoomPseudoAsync io, message.room_id, (err, clientIds) ->
if err?
return logger.err {room: message.room_id, err}, "failed to get room clients"

is_restricted_message = message.message not in RESTRICTED_USER_MESSAGE_TYPE_PASS_LIST

clientList = clientIds
.map((id) -> io.sockets.connected[id])
.filter(Boolean) # filter disconnected clients
.filter((client) ->
!(is_restricted_message && client.ol_context['is_restricted_user'])
)
Expand Down
12 changes: 7 additions & 5 deletions test/unit/coffee/DocumentUpdaterControllerTests.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ describe "DocumentUpdaterController", ->
@rclient[1].subscribe = sinon.stub()
@rclient[1].on = sinon.stub()
@EditorUpdatesController.listenForUpdatesFromDocumentUpdater()

it "should subscribe to the doc-updater stream", ->
@rclient[0].subscribe.calledWith("applied-ops").should.equal true

Expand All @@ -57,7 +57,7 @@ describe "DocumentUpdaterController", ->
beforeEach ->
@SafeJsonParse.parse = sinon.stub().callsArgWith 1, new Error("oops")
@EditorUpdatesController._processMessageFromDocumentUpdater @io, "applied-ops", "blah"

it "should log an error", ->
@logger.error.called.should.equal true

Expand Down Expand Up @@ -137,7 +137,7 @@ describe "DocumentUpdaterController", ->
beforeEach ->
@update.dup = true
@EditorUpdatesController._applyUpdateFromDocumentUpdater @io, @doc_id, @update

it "should send a version bump to the source client as usual", ->
@sourceClient.emit
.calledWith("otUpdateApplied", v: @version, doc: @doc_id)
Expand All @@ -158,14 +158,16 @@ describe "DocumentUpdaterController", ->
client_mapping[@clients[0].id] = @clients[0]
client_mapping[@clients[1].id] = @clients[1]
@io.sockets = {connected: client_mapping}
@io.to = sinon.stub().returns(clients: sinon.stub().yields(null, [@clients[0].id, @clients[1].id]))
@RoomManager.getClientsInRoomPseudoAsync = sinon.stub().yields(null, [@clients[0].id, @clients[1].id])
@EditorUpdatesController._processErrorFromDocumentUpdater @io, @doc_id, "Something went wrong"

it "should log a warning", ->
@logger.warn.called.should.equal true

it "should disconnect all clients in that document", ->
@io.to.calledWith(@doc_id).should.equal true
@RoomManager.getClientsInRoomPseudoAsync
.calledWith(@io, @doc_id)
.should.equal true
for client in @clients
client.disconnect.called.should.equal true

64 changes: 62 additions & 2 deletions test/unit/coffee/RoomManagerTests.coffee
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
chai = require('chai')
expect = chai.expect
should = chai.should()
sinon = require("sinon")
modulePath = "../../../app/js/RoomManager.js"
Expand All @@ -19,9 +20,9 @@ describe 'RoomManager', ->
@RoomEvents = @RoomManager.eventSource()
sinon.spy(@RoomEvents, 'emit')
sinon.spy(@RoomEvents, 'once')

describe "joinProject", ->

describe "when the project room is empty", ->

beforeEach (done) ->
Expand Down Expand Up @@ -262,3 +263,62 @@ describe 'RoomManager', ->

it "should not emit any events", ->
@RoomEvents.emit.called.should.equal false

describe "getClientsInRoomSync", ->
beforeEach ->
@io = {
sockets: {
adapter: {
rooms: {},
nsp: {
connected: {}
}
}
}
}
@room = "some-project-id"

describe "when the room does not exist", ->
it "should return an empty array", ->
expect(@RoomManager.getClientsInRoomSync(@io, @room)).to.deep.equal([])

describe "when the room exists", ->
beforeEach ->
@io.sockets.adapter.rooms[@room] = {sockets: {}}

describe "when nobody is in the room", ->
it "should return an empty array", ->
expect(@RoomManager.getClientsInRoomSync(@io, @room))
.to.deep.equal([])


describe "when a client is in the room", ->
beforeEach ->
@clientId = 'some-client-id'
@io.sockets.adapter.rooms[@room].sockets[@clientId] = true

describe "when the client is not connected", ->
it "should return an empty array", ->
expect(@RoomManager.getClientsInRoomSync(@io, @room))
.to.deep.equal([])

describe "when the client is connected", ->
beforeEach ->
@io.sockets.adapter.nsp.connected[@clientId] = { mock: 'client' }

it "should return a list with the clientId", ->
expect(@RoomManager.getClientsInRoomSync(@io, @room))
.to.deep.equal([@clientId])

describe "when two clients are in the room and are connected", ->
beforeEach ->
@clientId = 'some-client-id'
@io.sockets.adapter.rooms[@room].sockets[@clientId] = true
@clientId2 = 'some-client-id-2'
@io.sockets.adapter.rooms[@room].sockets[@clientId2] = true
@io.sockets.adapter.nsp.connected[@clientId] = { mock: 'client1' }
@io.sockets.adapter.nsp.connected[@clientId2] = { mock: 'client1' }

it "should return a list with the two clientIds", ->
expect(@RoomManager.getClientsInRoomSync(@io, @room))
.to.deep.equal([@clientId, @clientId2])
9 changes: 2 additions & 7 deletions test/unit/coffee/WebsocketControllerTests.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -119,19 +119,14 @@ describe 'WebsocketController', ->
@WebsocketLoadBalancer.emitToRoom = sinon.stub()
@RoomManager.leaveProjectAndDocs = sinon.stub()
@clientsInRoom = []
@io =
in: (room_id) =>
if room_id != @project_id
throw "expected room_id to be project_id"
{clients: (cb) => cb null, @clientsInRoom}
@RoomManager.getClientsInRoomPseudoAsync = sinon.stub().yields(null, @clientsInRoom)
@client.ol_context.project_id = @project_id
@client.ol_context.user_id = @user_id
@WebsocketController.FLUSH_IF_EMPTY_DELAY = 0
tk.reset() # Allow setTimeout to work.

describe "when the project is empty", ->
beforeEach (done) ->
@clientsInRoom = []
@WebsocketController.leaveProject @io, @client, done

it "should end clientTracking.clientDisconnected to the project room", ->
Expand Down Expand Up @@ -159,7 +154,7 @@ describe 'WebsocketController', ->

describe "when the project is not empty", ->
beforeEach ->
@clientsInRoom = ["mock-remaining-client"]
@clientsInRoom.push("mock-remaining-client")
@WebsocketController.leaveProject @io, @client

it "should not flush the project in the document updater", ->
Expand Down
24 changes: 12 additions & 12 deletions test/unit/coffee/WebsocketLoadBalancerTests.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ describe "WebsocketLoadBalancer", ->
'client-id-1': {id: 'client-id-1', emit: @emit1 = sinon.stub(), ol_context: {}}
'client-id-2': {id: 'client-id-2', emit: @emit2 = sinon.stub(), ol_context: {}}
}
@io.to = sinon.stub().returns(
clients: sinon.stub().yields(null, ['client-id-1', 'client-id-2'])
@RoomManager.getClientsInRoomPseudoAsync = sinon.stub().yields(
null, ['client-id-1', 'client-id-2']
)
data = JSON.stringify
room_id: @room_id
Expand All @@ -93,8 +93,8 @@ describe "WebsocketLoadBalancer", ->
@WebsocketLoadBalancer._processEditorEvent(@io, "editor-events", data)

it "should send the message to all (unique) clients in the room", ->
@io.to
.calledWith(@room_id)
@RoomManager.getClientsInRoomPseudoAsync
.calledWith(@io, @room_id)
.should.equal true
@emit1.calledWith(@message, @payload...).should.equal true
@emit2.calledWith(@message, @payload...).should.equal true
Expand All @@ -106,8 +106,8 @@ describe "WebsocketLoadBalancer", ->
'client-id-2': {id: 'client-id-2', emit: @emit2 = sinon.stub(), ol_context: {}}
'client-id-4': {id: 'client-id-4', emit: @emit4 = sinon.stub(), ol_context: {is_restricted_user: true}}
}
@io.to = sinon.stub().returns(
clients: sinon.stub().yields(null, ['client-id-1', 'client-id-2', 'client-id-4'])
@RoomManager.getClientsInRoomPseudoAsync = sinon.stub().yields(
null, ['client-id-1', 'client-id-2', 'client-id-4']
)
data = JSON.stringify
room_id: @room_id
Expand All @@ -116,8 +116,8 @@ describe "WebsocketLoadBalancer", ->
@WebsocketLoadBalancer._processEditorEvent(@io, "editor-events", data)

it "should send the message to all (unique) clients in the room", ->
@io.to
.calledWith(@room_id)
@RoomManager.getClientsInRoomPseudoAsync
.calledWith(@io, @room_id)
.should.equal true
@emit1.calledWith(@message, @payload...).should.equal true
@emit2.calledWith(@message, @payload...).should.equal true
Expand All @@ -130,8 +130,8 @@ describe "WebsocketLoadBalancer", ->
'client-id-2': {id: 'client-id-2', emit: @emit2 = sinon.stub(), ol_context: {}}
'client-id-4': {id: 'client-id-4', emit: @emit4 = sinon.stub(), ol_context: {is_restricted_user: true}}
}
@io.to = sinon.stub().returns(
clients: sinon.stub().yields(null, ['client-id-1', 'client-id-2', 'client-id-4'])
@RoomManager.getClientsInRoomPseudoAsync = sinon.stub().yields(
null, ['client-id-1', 'client-id-2', 'client-id-4']
)
data = JSON.stringify
room_id: @room_id
Expand All @@ -140,8 +140,8 @@ describe "WebsocketLoadBalancer", ->
@WebsocketLoadBalancer._processEditorEvent(@io, "editor-events", data)

it "should send the message to all (unique) clients in the room, who are not restricted", ->
@io.to
.calledWith(@room_id)
@RoomManager.getClientsInRoomPseudoAsync
.calledWith(@io, @room_id)
.should.equal true
@emit1.calledWith(@restrictedMessage, @payload...).should.equal true
@emit2.calledWith(@restrictedMessage, @payload...).should.equal true
Expand Down

0 comments on commit 67bbdbe

Please sign in to comment.