Skip to content

Commit

Permalink
Update remove client in erizoJS (lynckia#1638)
Browse files Browse the repository at this point in the history
  • Loading branch information
lodoyun authored Oct 5, 2020
1 parent 2d003ff commit 3067748
Show file tree
Hide file tree
Showing 7 changed files with 157 additions and 75 deletions.
14 changes: 6 additions & 8 deletions erizo_controller/erizoController/erizoController.js
Original file line number Diff line number Diff line change
Expand Up @@ -418,10 +418,13 @@ exports.deleteRoom = (roomId, callback) => {
return;
}

// delete all clients and their streams
room.forEachClient((client) => {
client.channel.disconnect();
});

// delete the remaining publishers (externalInputs)
if (!room.p2p) {
room.forEachClient((client) => {
client.removeSubscriptions();
});
room.streamManager.forEachPublishedStream((stream) => {
if (stream.hasAudio() || stream.hasVideo() || stream.hasScreen()) {
room.controller.removePublisher(stream.getID());
Expand All @@ -430,12 +433,7 @@ exports.deleteRoom = (roomId, callback) => {
room.streamManager.publishedStreams.clear();
}

room.forEachClient((client) => {
client.channel.disconnect();
});

rooms.deleteRoom(roomId);

updateMyState();
callback('Success');
};
Expand Down
24 changes: 8 additions & 16 deletions erizo_controller/erizoController/models/Client.js
Original file line number Diff line number Diff line change
Expand Up @@ -855,16 +855,6 @@ class Client extends events.EventEmitter {
callback();
}

removeSubscriptions() {
log.info(`message: removeSubscriptions, clientId: ${this.id}`);
this.room.streamManager.forEachPublishedStream((stream) => {
if (stream.hasAvSubscriber(this.id)) {
this.room.controller.removeSubscriber(this.id, stream.id);
stream.removeAvSubscriber(this.id);
}
});
}

onDisconnect() {
this.stopListeningToSocketEvents();
const timeStamp = new Date();
Expand All @@ -887,17 +877,17 @@ class Client extends events.EventEmitter {
}
});


if (this.room.controller) {
this.removeSubscriptions();
}
this.room.streamManager.forEachPublishedStream((stream) => {
if (stream.hasAvSubscriber(this.id)) {
stream.removeAvSubscriber(this.id);
}
});

this.room.streamManager.forEachPublishedStream((stream) => {
if (stream.getClientId() === this.id) {
if (stream.hasAudio() || stream.hasVideo() || stream.hasScreen()) {
if (!this.room.p2p) {
log.info('message: Unpublishing stream, streamId:', stream.id);
this.room.controller.removePublisher(this.id, stream.id);
if (global.config.erizoController.report.session_events) {
this.room.amqper.broadcast('event', { room: this.room.id,
user: this.id,
Expand All @@ -917,7 +907,9 @@ class Client extends events.EventEmitter {
type: 'user_disconnection',
timestamp: timeStamp.getTime() });
}
this.room.removeClient(this.id);
if (!this.room.p2p) {
this.room.removeClient(this.id);
}
this.emit('disconnect');
}
}
Expand Down
102 changes: 56 additions & 46 deletions erizo_controller/erizoJS/erizoJSController.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ const RovReplManager = require('./../common/ROV/rovReplManager').RovReplManager;
const Client = require('./models/Client').Client;
const Publisher = require('./models/Publisher').Publisher;
const ExternalInput = require('./models/Publisher').ExternalInput;
const PublisherManager = require('./models/PublisherManager').PublisherManager;

// Logger
const log = logger.getLogger('ErizoJSController');

exports.ErizoJSController = (erizoJSId, threadPool, ioThreadPool) => {
const that = {};
// {streamId1: Publisher, streamId2: Publisher}
const publishers = {};
const publisherManager = new PublisherManager();
// {clientId: Client}
const clients = new Map();
const replManager = new RovReplManager(that);
Expand Down Expand Up @@ -61,17 +62,10 @@ exports.ErizoJSController = (erizoJSId, threadPool, ioThreadPool) => {
};


that.publishers = publishers;
that.publisherManager = publisherManager;
that.ioThreadPool = io;
initMetrics();

const forEachPublisher = (action) => {
const publisherStreamIds = Object.keys(publishers);
for (let i = 0; i < publisherStreamIds.length; i += 1) {
action(publisherStreamIds[i], publishers[publisherStreamIds[i]]);
}
};

const onAdaptSchemeNotify = (callbackRpc, type, message) => {
callbackRpc(type, message);
};
Expand Down Expand Up @@ -139,14 +133,28 @@ exports.ErizoJSController = (erizoJSId, threadPool, ioThreadPool) => {
return;
}

log.info(`message: removeClient - closing all connections on, clientId: ${clientId}`);
client.closeAllConnections();
clients.delete(client.id);
callback('callback', true);
if (clients.size === 0) {
log.info('message: Removed all clients. Process will exit');
process.exit(0);
}
log.info(`message: removeClient - removingPublished streams, clientId: ${clientId}`);
const publishers = publisherManager.getPublishersByClientId(clientId);

const removePromises = [];

publishers.forEach((pub) => {
log.info(`message: removeClient - removingPublished stream, clientId: ${clientId}, streamId: ${pub.streamId}`);
removePromises.push(that.removePublisher(clientId, pub.streamId));
});

removePromises.push(that.removeSubscriptions(clientId));

Promise.all(removePromises).then(() => {
log.info(`message: removeClient - closing all connections on, clientId: ${clientId}`);
client.closeAllConnections();
clients.delete(client.id);
callback('callback', true);
if (clients.size === 0) {
log.info('message: Removed all clients. Process will exit');
process.exit(0);
}
});
};

that.rovMessage = (args, callback) => {
Expand All @@ -155,10 +163,10 @@ exports.ErizoJSController = (erizoJSId, threadPool, ioThreadPool) => {

that.addExternalInput = (erizoControllerId, streamId, url, label, callbackRpc) => {
updateUptimeInfo();
if (publishers[streamId] === undefined) {
if (!publisherManager.has(streamId)) {
const client = getOrCreateClient(erizoControllerId, url);
publishers[streamId] = new ExternalInput(url, streamId, label, threadPool);
const ei = publishers[streamId];
const ei = new ExternalInput(url, streamId, label, threadPool);
publisherManager.add(streamId, ei);
const answer = ei.init();
// We add the connection manually to the client
client.addConnection(ei);
Expand All @@ -175,14 +183,16 @@ exports.ErizoJSController = (erizoJSId, threadPool, ioThreadPool) => {

that.addExternalOutput = (streamId, url, options) => {
updateUptimeInfo();
if (publishers[streamId]) publishers[streamId].addExternalOutput(url, options);
if (publisherManager.has(streamId)) {
publisherManager.getPublisherById(streamId).addExternalOutput(url, options);
}
};

that.removeExternalOutput = (streamId, url) => {
if (publishers[streamId] !== undefined) {
if (publisherManager.has(streamId)) {
log.info('message: Stopping ExternalOutput, ' +
`id: ${publishers[streamId].getExternalOutput(url).id}`);
publishers[streamId].removeExternalOutput(url);
`id: ${publisherManager.getPublisherById(streamId).getExternalOutput(url).id}`);
publisherManager.getPublisherById(streamId).removeExternalOutput(url);
}
};

Expand Down Expand Up @@ -222,12 +232,12 @@ exports.ErizoJSController = (erizoJSId, threadPool, ioThreadPool) => {

that.processStreamMessage = (erizoControllerId, clientId, streamId, msg) => {
let node;
const publisher = publishers[streamId];
if (!publisher) {
if (!publisherManager.has(streamId)) {
log.warn('message: Process Stream message stream not found, ' +
`clientId: ${clientId}, streamId: ${streamId}`);
return;
}
const publisher = publisherManager.getPublisherById(streamId);

if (publisher.clientId === clientId) {
node = publisher;
Expand Down Expand Up @@ -256,7 +266,7 @@ exports.ErizoJSController = (erizoJSId, threadPool, ioThreadPool) => {
log.info('addPublisher, clientId', clientId, 'streamId', streamId);
const client = getOrCreateClient(erizoControllerId, clientId, options);

if (publishers[streamId] === undefined) {
if (!publisherManager.has(streamId)) {
// eslint-disable-next-line no-param-reassign
options.publicIP = that.publicIP;
// eslint-disable-next-line no-param-reassign
Expand All @@ -268,7 +278,7 @@ exports.ErizoJSController = (erizoJSId, threadPool, ioThreadPool) => {
logger.objectToLog(options),
logger.objectToLog(options.metadata));
publisher = new Publisher(clientId, streamId, connection, options);
publishers[streamId] = publisher;
publisherManager.add(streamId, publisher);
publisher.initMediaStream();
publisher.on('callback', onAdaptSchemeNotify.bind(this, callbackRpc));
publisher.on('periodic_stats', onPeriodicStats.bind(this, streamId, undefined));
Expand Down Expand Up @@ -296,7 +306,7 @@ exports.ErizoJSController = (erizoJSId, threadPool, ioThreadPool) => {
});
}
} else {
publisher = publishers[streamId];
publisher = publisherManager.getPublisherById(streamId);
if (publisher.numSubscribers === 0) {
log.warn('message: publisher already set but no subscribers will ignore, ',
`code: ${WARN_CONFLICT}, streamId: ${streamId},`,
Expand All @@ -315,7 +325,7 @@ exports.ErizoJSController = (erizoJSId, threadPool, ioThreadPool) => {
*/
that.addSubscriber = (erizoControllerId, clientId, streamId, options, callbackRpc) => {
updateUptimeInfo();
const publisher = publishers[streamId];
const publisher = publisherManager.getPublisherById(streamId);
if (publisher === undefined) {
log.warn('message: addSubscriber to unknown publisher, ',
`code: ${WARN_NOT_FOUND}, streamId: ${streamId}, `,
Expand Down Expand Up @@ -392,7 +402,7 @@ exports.ErizoJSController = (erizoJSId, threadPool, ioThreadPool) => {
return;
}

const knownPublishers = streamIds.map(streamId => publishers[streamId])
const knownPublishers = streamIds.map(streamId => publisherManager.getPublisherById(streamId))
.filter(pub =>
pub !== undefined &&
!pub.getSubscriber(clientId));
Expand Down Expand Up @@ -454,7 +464,7 @@ exports.ErizoJSController = (erizoJSId, threadPool, ioThreadPool) => {
* Removes multiple subscribers from the room.
*/
that.removeMultipleSubscribers = (clientId, streamIds, callbackRpc) => {
const knownPublishers = streamIds.map(streamId => publishers[streamId])
const knownPublishers = streamIds.map(streamId => publisherManager.getPublisherById(streamId))
.filter(pub =>
pub !== undefined &&
pub.getSubscriber(clientId));
Expand Down Expand Up @@ -506,7 +516,7 @@ exports.ErizoJSController = (erizoJSId, threadPool, ioThreadPool) => {
*/
that.removePublisher = (clientId, streamId, callback = () => {}) =>
new Promise((resolve) => {
const publisher = publishers[streamId];
const publisher = publisherManager.getPublisherById(streamId);
if (publisher !== undefined) {
log.info(`message: Removing publisher, id: ${clientId}, streamId: ${streamId},`,
logger.objectToLog(publisher.options), logger.objectToLog(publisher.options.metadata));
Expand All @@ -518,13 +528,13 @@ exports.ErizoJSController = (erizoJSId, threadPool, ioThreadPool) => {
publisher.removeSubscriber(subscriberId);
});
publisher.removeExternalOutputs().then(() => {
delete publishers[streamId];
publisherManager.remove(streamId);
closeNode(publisher).then(() => {
publisher.muxer.close((message) => {
log.info('message: muxer closed succesfully, ',
`id: ${streamId},`,
logger.objectToLog(message));
const count = Object.keys(publishers).length;
const count = publisherManager.getPublisherCount();
log.debug(`message: remaining publishers, publisherCount: ${count}`);
callback('callback', true);
resolve();
Expand All @@ -546,7 +556,7 @@ exports.ErizoJSController = (erizoJSId, threadPool, ioThreadPool) => {
* This also removes it from the associated OneToManyProcessor.
*/
that.removeSubscriber = (clientId, streamId, callback = () => {}) => {
const publisher = publishers[streamId];
const publisher = publisherManager.getPublisherById(streamId);
if (publisher && publisher.hasSubscriber(clientId)) {
const subscriber = publisher.getSubscriber(clientId);
log.info(`message: removing subscriber, streamId: ${subscriber.streamId}, ` +
Expand All @@ -570,14 +580,15 @@ exports.ErizoJSController = (erizoJSId, threadPool, ioThreadPool) => {
log.info('message: removing subscriptions, clientId:', clientId);
// we go through all the connections in the client and we close them
const closePromises = [];
forEachPublisher((publisherId, publisher) => {
that.publisherManager.forEach((publisher) => {
const subscriber = publisher.getSubscriber(clientId);
if (subscriber) {
log.debug('message: removing subscription, ' +
'id:', subscriber.clientId, ',',
logger.objectToLog(subscriber.options), logger.objectToLog(subscriber.options.metadata));
closePromises.push(closeNode(subscriber));
publisher.removeSubscriber(clientId);
closePromises.push(closeNode(subscriber).then(() => {
publisher.removeSubscriber(clientId);
}));
}
});
return Promise.all(closePromises);
Expand All @@ -588,8 +599,8 @@ exports.ErizoJSController = (erizoJSId, threadPool, ioThreadPool) => {
let publisher;
log.debug(`message: Requested stream stats, streamID: ${streamId}`);
const promises = [];
if (streamId && publishers[streamId]) {
publisher = publishers[streamId];
if (streamId && publisherManager.has(streamId)) {
publisher = publisherManager.getPublisherById(streamId);
promises.push(publisher.getStats('publisher', stats));

publisher.forEachSubscriber((subscriberId, subscriber) => {
Expand All @@ -608,8 +619,8 @@ exports.ErizoJSController = (erizoJSId, threadPool, ioThreadPool) => {
let publisher;
log.debug(`message: Requested subscription to stream stats, streamId: ${streamId}`);

if (streamId && publishers[streamId]) {
publisher = publishers[streamId];
if (streamId && publisherManager.has(streamId)) {
publisher = publisherManager.getPublisherById(streamId);

if (global.config.erizoController.reportSubscriptions &&
global.config.erizoController.reportSubscriptions.maxSubscriptions > 0) {
Expand Down Expand Up @@ -667,10 +678,9 @@ exports.ErizoJSController = (erizoJSId, threadPool, ioThreadPool) => {
const metrics = Object.assign({}, that.metrics);
metrics.totalConnections = 0;
metrics.connectionLevels = Array(10).fill(0);
metrics.publishers = Object.keys(that.publishers).length;
metrics.publishers = publisherManager.getPublisherCount();
let subscribers = 0;
Object.keys(that.publishers).forEach((streamId) => {
const publisher = that.publishers[streamId];
publisherManager.forEach((publisher) => {
subscribers += publisher.numSubscribers;
});
metrics.subscribers = subscribers;
Expand Down
41 changes: 41 additions & 0 deletions erizo_controller/erizoJS/models/PublisherManager.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
class PublisherManager {
constructor() {
// streamId: Publisher
this.publisherNodes = new Map();
}

add(streamId, publisherNode) {
this.publisherNodes.set(streamId, publisherNode);
}

remove(id) {
return this.publisherNodes.delete(id);
}

forEach(doSomething) {
this.publisherNodes.forEach((publisherNode) => {
doSomething(publisherNode);
});
}

getPublisherById(id) {
return this.publisherNodes.get(id);
}

getPublishersByClientId(clientId) {
const nodes = this.publisherNodes.values();
const publisherNodes = Array.from(nodes).filter(node => node.clientId === clientId);
return publisherNodes;
}

has(id) {
return this.publisherNodes.has(id);
}

getPublisherCount() {
return this.publisherNodes.size;
}
}

exports.PublisherManager = PublisherManager;

1 change: 0 additions & 1 deletion erizo_controller/test/erizoController/Client.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ describe('Erizo Controller / Client', () => {
expect(client.sendMessage).not.to.be.undefined;
expect(client.on).not.to.be.undefined;
expect(client.setNewChannel).not.to.be.undefined;
expect(client.removeSubscriptions).not.to.be.undefined;
expect(client.disconnect).not.to.be.undefined;
});

Expand Down
Loading

0 comments on commit 3067748

Please sign in to comment.