From 3eba1996c8fd0a8fe35b6bc1d08872db0925163c Mon Sep 17 00:00:00 2001 From: Jakob Ackermann Date: Tue, 28 Apr 2020 11:43:05 +0200 Subject: [PATCH 1/4] [misc] get the list of clients in a room in a synchronous fashion the implementation in socket.io-adapter is prone to race conditions: it passes the list of clients via process.nextTick, but given that NodeJS can process multiple network events in the same event loop cycle, multiple clients may seem to be the last in a room, but actually there are not -- some other client joined in the mean time. (mean time: calculating the list of clients and us receiving it) --- app/coffee/DocumentUpdaterController.coffee | 2 +- app/coffee/RoomManager.coffee | 21 ++++++++++++++-- app/coffee/WebsocketController.coffee | 2 +- app/coffee/WebsocketLoadBalancer.coffee | 4 ++-- .../DocumentUpdaterControllerTests.coffee | 12 ++++++---- .../coffee/WebsocketControllerTests.coffee | 9 ++----- .../coffee/WebsocketLoadBalancerTests.coffee | 24 +++++++++---------- 7 files changed, 44 insertions(+), 30 deletions(-) diff --git a/app/coffee/DocumentUpdaterController.coffee b/app/coffee/DocumentUpdaterController.coffee index 93127af9..86e31d4c 100644 --- a/app/coffee/DocumentUpdaterController.coffee +++ b/app/coffee/DocumentUpdaterController.coffee @@ -76,7 +76,7 @@ module.exports = DocumentUpdaterController = (sender || io).to(doc_id).emit "otUpdateApplied", update _processErrorFromDocumentUpdater: (io, doc_id, error, message) -> - io.to(doc_id).clients (err, clientIds) -> + RoomManager.getClientsInRoomPseudoAsync io, doc_id, (err, clientIds) -> if err? return logger.err {room: doc_id, err}, "failed to get room clients" diff --git a/app/coffee/RoomManager.coffee b/app/coffee/RoomManager.coffee index 83d9f2a7..44cc03bb 100644 --- a/app/coffee/RoomManager.coffee +++ b/app/coffee/RoomManager.coffee @@ -87,8 +87,7 @@ module.exports = RoomManager = metrics.gauge "room-listeners", RoomEvents.eventNames().length _clientsInRoom: (client, room, cb) -> - client.server.in(room).clients (err, clients) -> - cb(clients?.length || 0) + cb(@getClientsInRoomSync(client.server, room).length) _roomsClientIsIn: (client) -> # skip the socket id @@ -96,3 +95,21 @@ module.exports = RoomManager = _clientAlreadyInRoom: (client, room) -> return client.rooms.hasOwnProperty(room) + + getClientsInRoomSync: (io, room) -> + # the implementation in socket.io-adapter is prone to race conditions: + # it passes the list of clients via process.nextTick, but given that + # NodeJS can process multiple network events in the same event loop + # cycle, multiple clients may seem to be the last in a room, but + # actually there are not -- some other client joined in the mean time. + # (mean time: calculating the list of clients and us receiving it) + adapter = io.sockets.adapter + return [] unless adapter.rooms.hasOwnProperty(room) + return Object.keys(adapter.rooms[room].sockets).filter((id) -> + return adapter.nsp.connected[id] + ) + + # HACK: it calls the callback synchronously -- hence the name pseudoAsync + # calling it asynchronously would lead to race conditions -- see above + getClientsInRoomPseudoAsync: (io, room, cb) -> + cb(null, RoomManager.getClientsInRoomSync(io, room)) diff --git a/app/coffee/WebsocketController.coffee b/app/coffee/WebsocketController.coffee index 7d9d3385..b47905a0 100644 --- a/app/coffee/WebsocketController.coffee +++ b/app/coffee/WebsocketController.coffee @@ -74,7 +74,7 @@ module.exports = WebsocketController = RoomManager.leaveProjectAndDocs(client) setTimeout () -> - io.in(project_id).clients (error, remainingClients) -> + RoomManager.getClientsInRoomPseudoAsync io, project_id, (error, remainingClients) -> if remainingClients.length == 0 # Flush project in the background DocumentUpdaterManager.flushProjectToMongoAndDelete project_id, (err) -> diff --git a/app/coffee/WebsocketLoadBalancer.coffee b/app/coffee/WebsocketLoadBalancer.coffee index 69267e17..ff5ebfd0 100644 --- a/app/coffee/WebsocketLoadBalancer.coffee +++ b/app/coffee/WebsocketLoadBalancer.coffee @@ -67,7 +67,7 @@ module.exports = WebsocketLoadBalancer = if message.room_id == "all" io.sockets.emit(message.message, message.payload...) else if message.message is 'clientTracking.refresh' && message.room_id? - io.to(message.room_id).clients (err, clientIds) -> + RoomManager.getClientsInRoomPseudoAsync io, message.room_id, (err, clientIds) -> if err? return logger.err {room: message.room_id, err}, "failed to get room clients" logger.log {channel:channel, message: message.message, room_id: message.room_id, message_id: message._id, socketIoClients: clientIds}, "refreshing client list" @@ -78,7 +78,7 @@ module.exports = WebsocketLoadBalancer = status = EventLogger.checkEventOrder("editor-events", message._id, message) if status is "duplicate" return # skip duplicate events - io.to(message.room_id).clients (err, clientIds) -> + RoomManager.getClientsInRoomPseudoAsync io, message.room_id, (err, clientIds) -> if err? return logger.err {room: message.room_id, err}, "failed to get room clients" diff --git a/test/unit/coffee/DocumentUpdaterControllerTests.coffee b/test/unit/coffee/DocumentUpdaterControllerTests.coffee index b991f8b7..909dda34 100644 --- a/test/unit/coffee/DocumentUpdaterControllerTests.coffee +++ b/test/unit/coffee/DocumentUpdaterControllerTests.coffee @@ -41,7 +41,7 @@ describe "DocumentUpdaterController", -> @rclient[1].subscribe = sinon.stub() @rclient[1].on = sinon.stub() @EditorUpdatesController.listenForUpdatesFromDocumentUpdater() - + it "should subscribe to the doc-updater stream", -> @rclient[0].subscribe.calledWith("applied-ops").should.equal true @@ -57,7 +57,7 @@ describe "DocumentUpdaterController", -> beforeEach -> @SafeJsonParse.parse = sinon.stub().callsArgWith 1, new Error("oops") @EditorUpdatesController._processMessageFromDocumentUpdater @io, "applied-ops", "blah" - + it "should log an error", -> @logger.error.called.should.equal true @@ -137,7 +137,7 @@ describe "DocumentUpdaterController", -> beforeEach -> @update.dup = true @EditorUpdatesController._applyUpdateFromDocumentUpdater @io, @doc_id, @update - + it "should send a version bump to the source client as usual", -> @sourceClient.emit .calledWith("otUpdateApplied", v: @version, doc: @doc_id) @@ -158,14 +158,16 @@ describe "DocumentUpdaterController", -> client_mapping[@clients[0].id] = @clients[0] client_mapping[@clients[1].id] = @clients[1] @io.sockets = {connected: client_mapping} - @io.to = sinon.stub().returns(clients: sinon.stub().yields(null, [@clients[0].id, @clients[1].id])) + @RoomManager.getClientsInRoomPseudoAsync = sinon.stub().yields(null, [@clients[0].id, @clients[1].id]) @EditorUpdatesController._processErrorFromDocumentUpdater @io, @doc_id, "Something went wrong" it "should log a warning", -> @logger.warn.called.should.equal true it "should disconnect all clients in that document", -> - @io.to.calledWith(@doc_id).should.equal true + @RoomManager.getClientsInRoomPseudoAsync + .calledWith(@io, @doc_id) + .should.equal true for client in @clients client.disconnect.called.should.equal true diff --git a/test/unit/coffee/WebsocketControllerTests.coffee b/test/unit/coffee/WebsocketControllerTests.coffee index f7980731..e2e91480 100644 --- a/test/unit/coffee/WebsocketControllerTests.coffee +++ b/test/unit/coffee/WebsocketControllerTests.coffee @@ -119,11 +119,7 @@ describe 'WebsocketController', -> @WebsocketLoadBalancer.emitToRoom = sinon.stub() @RoomManager.leaveProjectAndDocs = sinon.stub() @clientsInRoom = [] - @io = - in: (room_id) => - if room_id != @project_id - throw "expected room_id to be project_id" - {clients: (cb) => cb null, @clientsInRoom} + @RoomManager.getClientsInRoomPseudoAsync = sinon.stub().yields(null, @clientsInRoom) @client.ol_context.project_id = @project_id @client.ol_context.user_id = @user_id @WebsocketController.FLUSH_IF_EMPTY_DELAY = 0 @@ -131,7 +127,6 @@ describe 'WebsocketController', -> describe "when the project is empty", -> beforeEach (done) -> - @clientsInRoom = [] @WebsocketController.leaveProject @io, @client, done it "should end clientTracking.clientDisconnected to the project room", -> @@ -159,7 +154,7 @@ describe 'WebsocketController', -> describe "when the project is not empty", -> beforeEach -> - @clientsInRoom = ["mock-remaining-client"] + @clientsInRoom.push("mock-remaining-client") @WebsocketController.leaveProject @io, @client it "should not flush the project in the document updater", -> diff --git a/test/unit/coffee/WebsocketLoadBalancerTests.coffee b/test/unit/coffee/WebsocketLoadBalancerTests.coffee index 9ee7a2ab..db56b0e4 100644 --- a/test/unit/coffee/WebsocketLoadBalancerTests.coffee +++ b/test/unit/coffee/WebsocketLoadBalancerTests.coffee @@ -83,8 +83,8 @@ describe "WebsocketLoadBalancer", -> 'client-id-1': {id: 'client-id-1', emit: @emit1 = sinon.stub(), ol_context: {}} 'client-id-2': {id: 'client-id-2', emit: @emit2 = sinon.stub(), ol_context: {}} } - @io.to = sinon.stub().returns( - clients: sinon.stub().yields(null, ['client-id-1', 'client-id-2']) + @RoomManager.getClientsInRoomPseudoAsync = sinon.stub().yields( + null, ['client-id-1', 'client-id-2'] ) data = JSON.stringify room_id: @room_id @@ -93,8 +93,8 @@ describe "WebsocketLoadBalancer", -> @WebsocketLoadBalancer._processEditorEvent(@io, "editor-events", data) it "should send the message to all (unique) clients in the room", -> - @io.to - .calledWith(@room_id) + @RoomManager.getClientsInRoomPseudoAsync + .calledWith(@io, @room_id) .should.equal true @emit1.calledWith(@message, @payload...).should.equal true @emit2.calledWith(@message, @payload...).should.equal true @@ -106,8 +106,8 @@ describe "WebsocketLoadBalancer", -> 'client-id-2': {id: 'client-id-2', emit: @emit2 = sinon.stub(), ol_context: {}} 'client-id-4': {id: 'client-id-4', emit: @emit4 = sinon.stub(), ol_context: {is_restricted_user: true}} } - @io.to = sinon.stub().returns( - clients: sinon.stub().yields(null, ['client-id-1', 'client-id-2', 'client-id-4']) + @RoomManager.getClientsInRoomPseudoAsync = sinon.stub().yields( + null, ['client-id-1', 'client-id-2', 'client-id-4'] ) data = JSON.stringify room_id: @room_id @@ -116,8 +116,8 @@ describe "WebsocketLoadBalancer", -> @WebsocketLoadBalancer._processEditorEvent(@io, "editor-events", data) it "should send the message to all (unique) clients in the room", -> - @io.to - .calledWith(@room_id) + @RoomManager.getClientsInRoomPseudoAsync + .calledWith(@io, @room_id) .should.equal true @emit1.calledWith(@message, @payload...).should.equal true @emit2.calledWith(@message, @payload...).should.equal true @@ -130,8 +130,8 @@ describe "WebsocketLoadBalancer", -> 'client-id-2': {id: 'client-id-2', emit: @emit2 = sinon.stub(), ol_context: {}} 'client-id-4': {id: 'client-id-4', emit: @emit4 = sinon.stub(), ol_context: {is_restricted_user: true}} } - @io.to = sinon.stub().returns( - clients: sinon.stub().yields(null, ['client-id-1', 'client-id-2', 'client-id-4']) + @RoomManager.getClientsInRoomPseudoAsync = sinon.stub().yields( + null, ['client-id-1', 'client-id-2', 'client-id-4'] ) data = JSON.stringify room_id: @room_id @@ -140,8 +140,8 @@ describe "WebsocketLoadBalancer", -> @WebsocketLoadBalancer._processEditorEvent(@io, "editor-events", data) it "should send the message to all (unique) clients in the room, who are not restricted", -> - @io.to - .calledWith(@room_id) + @RoomManager.getClientsInRoomPseudoAsync + .calledWith(@io, @room_id) .should.equal true @emit1.calledWith(@restrictedMessage, @payload...).should.equal true @emit2.calledWith(@restrictedMessage, @payload...).should.equal true From a6060b9706cb24c7242a100e927e6af72d40cf87 Mon Sep 17 00:00:00 2001 From: Jakob Ackermann Date: Tue, 28 Apr 2020 11:47:05 +0200 Subject: [PATCH 2/4] [misc] change the indent_style to spaces for the RoomManager --- .editorconfig | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 .editorconfig diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 00000000..69d731be --- /dev/null +++ b/.editorconfig @@ -0,0 +1,2 @@ +[app/coffee/RoomManager.coffee] +indent_style = space From 6ee86754066c7b42758cf9012a0bdfcaf38cabd4 Mon Sep 17 00:00:00 2001 From: Jakob Ackermann Date: Tue, 28 Apr 2020 11:50:24 +0200 Subject: [PATCH 3/4] [misc] remove safe guards from processing previously unsafe client lists RoomManager.getClientsInRoomPseudoAsync is checking for existing clients and returns the result synchronously to us. This reverts commit 4dacb0fa37c37576959d3804ae177910bc7e1e3a Bring back 8026003f8e9dab506d1a8a3e6e5f41188555a6bb --- app/coffee/DocumentUpdaterController.coffee | 2 +- app/coffee/WebsocketLoadBalancer.coffee | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/app/coffee/DocumentUpdaterController.coffee b/app/coffee/DocumentUpdaterController.coffee index 86e31d4c..06a6276a 100644 --- a/app/coffee/DocumentUpdaterController.coffee +++ b/app/coffee/DocumentUpdaterController.coffee @@ -80,7 +80,7 @@ module.exports = DocumentUpdaterController = if err? return logger.err {room: doc_id, err}, "failed to get room clients" - for client in clientIds.map((id) -> io.sockets.connected[id]) when client # filter disconnected clients + for client in clientIds.map((id) -> io.sockets.connected[id]) logger.warn err: error, doc_id: doc_id, client_id: client.id, "error from document updater, disconnecting client" client.emit "otUpdateError", error, message client.disconnect() diff --git a/app/coffee/WebsocketLoadBalancer.coffee b/app/coffee/WebsocketLoadBalancer.coffee index ff5ebfd0..50e4da0c 100644 --- a/app/coffee/WebsocketLoadBalancer.coffee +++ b/app/coffee/WebsocketLoadBalancer.coffee @@ -71,7 +71,7 @@ module.exports = WebsocketLoadBalancer = if err? return logger.err {room: message.room_id, err}, "failed to get room clients" logger.log {channel:channel, message: message.message, room_id: message.room_id, message_id: message._id, socketIoClients: clientIds}, "refreshing client list" - for clientId in clientIds when io.sockets.connected[clientId] # filter disconnected clients + for clientId in clientIds ConnectedUsersManager.refreshClient(message.room_id, clientId) else if message.room_id? if message._id? && Settings.checkEventOrder @@ -86,7 +86,6 @@ module.exports = WebsocketLoadBalancer = clientList = clientIds .map((id) -> io.sockets.connected[id]) - .filter(Boolean) # filter disconnected clients .filter((client) -> !(is_restricted_message && client.ol_context['is_restricted_user']) ) From 73fcba8e26e1965ef1324909630598f3a7f96475 Mon Sep 17 00:00:00 2001 From: Jakob Ackermann Date: Tue, 28 Apr 2020 12:54:45 +0200 Subject: [PATCH 4/4] [RoomManagerTests] add unit tests for getClientsInRoomSync --- test/unit/coffee/RoomManagerTests.coffee | 64 +++++++++++++++++++++++- 1 file changed, 62 insertions(+), 2 deletions(-) diff --git a/test/unit/coffee/RoomManagerTests.coffee b/test/unit/coffee/RoomManagerTests.coffee index 50512807..f65d4480 100644 --- a/test/unit/coffee/RoomManagerTests.coffee +++ b/test/unit/coffee/RoomManagerTests.coffee @@ -1,4 +1,5 @@ chai = require('chai') +expect = chai.expect should = chai.should() sinon = require("sinon") modulePath = "../../../app/js/RoomManager.js" @@ -19,9 +20,9 @@ describe 'RoomManager', -> @RoomEvents = @RoomManager.eventSource() sinon.spy(@RoomEvents, 'emit') sinon.spy(@RoomEvents, 'once') - + describe "joinProject", -> - + describe "when the project room is empty", -> beforeEach (done) -> @@ -262,3 +263,62 @@ describe 'RoomManager', -> it "should not emit any events", -> @RoomEvents.emit.called.should.equal false + + describe "getClientsInRoomSync", -> + beforeEach -> + @io = { + sockets: { + adapter: { + rooms: {}, + nsp: { + connected: {} + } + } + } + } + @room = "some-project-id" + + describe "when the room does not exist", -> + it "should return an empty array", -> + expect(@RoomManager.getClientsInRoomSync(@io, @room)).to.deep.equal([]) + + describe "when the room exists", -> + beforeEach -> + @io.sockets.adapter.rooms[@room] = {sockets: {}} + + describe "when nobody is in the room", -> + it "should return an empty array", -> + expect(@RoomManager.getClientsInRoomSync(@io, @room)) + .to.deep.equal([]) + + + describe "when a client is in the room", -> + beforeEach -> + @clientId = 'some-client-id' + @io.sockets.adapter.rooms[@room].sockets[@clientId] = true + + describe "when the client is not connected", -> + it "should return an empty array", -> + expect(@RoomManager.getClientsInRoomSync(@io, @room)) + .to.deep.equal([]) + + describe "when the client is connected", -> + beforeEach -> + @io.sockets.adapter.nsp.connected[@clientId] = { mock: 'client' } + + it "should return a list with the clientId", -> + expect(@RoomManager.getClientsInRoomSync(@io, @room)) + .to.deep.equal([@clientId]) + + describe "when two clients are in the room and are connected", -> + beforeEach -> + @clientId = 'some-client-id' + @io.sockets.adapter.rooms[@room].sockets[@clientId] = true + @clientId2 = 'some-client-id-2' + @io.sockets.adapter.rooms[@room].sockets[@clientId2] = true + @io.sockets.adapter.nsp.connected[@clientId] = { mock: 'client1' } + @io.sockets.adapter.nsp.connected[@clientId2] = { mock: 'client1' } + + it "should return a list with the two clientIds", -> + expect(@RoomManager.getClientsInRoomSync(@io, @room)) + .to.deep.equal([@clientId, @clientId2])