Skip to content

Commit

Permalink
Erizo Controller refactoring to add Clients and Channels (#1041)
Browse files Browse the repository at this point in the history
  • Loading branch information
jcague authored Sep 28, 2017
1 parent b4b1e8b commit 3ddccc2
Show file tree
Hide file tree
Showing 13 changed files with 1,836 additions and 1,541 deletions.
25 changes: 14 additions & 11 deletions erizo_controller/common/amqper.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,14 @@ exports.connect = function(callback) {
}
} catch(err) {
log.error('message: error processing message, ' +
'queueName: ' + clientQueue.name + ', ' +
logger.objectToLog(err));
'queueName: ' + clientQueue.name + ', error: ' + err.message);
}
});

});
} catch (err) {
log.error('message: exchange error, ' +
'exchangeName: ' + exchange.name + ', ' +
logger.objectToLog(err));
'exchangeName: ' + exchange.name + ', error: ' + err.message);
}
});

Expand Down Expand Up @@ -127,15 +125,13 @@ exports.bind = function(id, callback) {
rpcPublic[message.method].apply(rpcPublic, message.args);
} catch (error) {
log.error('message: error processing call, ' +
'queueName: ' + q.name + ', ' +
logger.objectToLog(error));
'queueName: ' + q.name + ', error: ' + error.message);
}

});
} catch (err) {
log.error('message: queue error, ' +
'queueName: ' + q.name + ', ' +
logger.objectToLog(err));
'queueName: ' + q.name + ', error: ' + err.message);
}

});
Expand All @@ -161,16 +157,23 @@ exports.bindBroadcast = function(id, callback) {
}
if (body.message.method && rpcPublic[body.message.method]) {
body.message.args.push(answer);
rpcPublic[body.message.method].apply(rpcPublic, body.message.args);
try {
rpcPublic[body.message.method].apply(rpcPublic, body.message.args);
} catch(e) {
log.warn('message: error processing call, error:', e.message);
}
} else {
try {
callback(body.message, answer);
} catch(e) {
log.warn('message: error processing callback, error:', e.message);
}
}
});

} catch (err) {
log.error('message: exchange error, ' +
'queueName: ' + q.name + ', ' +
logger.objectToLog(err));
'queueName: ' + q.name + ', error: ' + err.message);
}

});
Expand Down
2 changes: 2 additions & 0 deletions erizo_controller/common/logger.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
'use strict';
var log4js = require('log4js');

global.config = global.config || {};

global.config.logger = global.config.logger || {};

var logFile = global.config.logger.configFile || '../log4js_configuration.json';
Expand Down
19 changes: 19 additions & 0 deletions erizo_controller/erizoClient/src/Room.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const Room = (altIo, altConnection, specInput) => {
that.Connection = altConnection === undefined ? Connection : altConnection;

let socket = Socket(altIo);
that.socket = socket;
let remoteStreams = that.remoteStreams;
let localStreams = that.localStreams;

Expand Down Expand Up @@ -139,6 +140,16 @@ const Room = (altIo, altConnection, specInput) => {
connection.createOffer();
};

const removeLocalStreamP2PConnection = (streamInput, peerSocket) => {
const stream = streamInput;
if (stream.pc === undefined || !stream.pc.has(peerSocket)) {
return;
}
const pc = stream.pc.get(peerSocket);
pc.close();
stream.pc.remove(peerSocket);
};

const getErizoConnectionOptions = (stream, options, isRemote) => {
const connectionOpts = {
callback(message) {
Expand Down Expand Up @@ -240,6 +251,13 @@ const Room = (altIo, altConnection, specInput) => {
createLocalStreamP2PConnection(myStream, arg.peerSocket);
};

const socketOnUnpublishMe = (arg) => {
const myStream = localStreams.get(arg.streamId);
if (myStream) {
removeLocalStreamP2PConnection(myStream, arg.peerSocket);
}
};

const socketOnBandwidthAlert = (arg) => {
Logger.info('Bandwidth Alert on', arg.streamID, 'message',
arg.message, 'BW:', arg.bandwidth);
Expand Down Expand Up @@ -814,6 +832,7 @@ const Room = (altIo, altConnection, specInput) => {
socket.on('signaling_message_erizo', socketEventToArgs.bind(null, socketOnErizoMessage));
socket.on('signaling_message_peer', socketEventToArgs.bind(null, socketOnPeerMessage));
socket.on('publish_me', socketEventToArgs.bind(null, socketOnPublishMe));
socket.on('unpublish_me', socketEventToArgs.bind(null, socketOnUnpublishMe));
socket.on('onBandwidthAlert', socketEventToArgs.bind(null, socketOnBandwidthAlert));
socket.on('onDataStream', socketEventToArgs.bind(null, socketOnDataStream));
socket.on('onUpdateAttributeStream', socketEventToArgs.bind(null, socketOnUpdateAttributeStream));
Expand Down
95 changes: 87 additions & 8 deletions erizo_controller/erizoClient/src/Socket.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ const SocketEvent = (type, specInput) => {
const Socket = (newIo) => {
const that = EventDispatcher();
const defaultCallback = () => {};
const messageBuffer = [];

that.CONNECTED = Symbol('connected');
that.RECONNECTING = Symbol('reconnecting');
that.DISCONNECTED = Symbol('disconnected');

const WEBSOCKET_NORMAL_CLOSURE = 1000;
that.state = that.DISCONNECTED;
that.IO = newIo === undefined ? io : newIo;

Expand All @@ -31,22 +34,47 @@ const Socket = (newIo) => {
that.emit(SocketEvent(type, { args }));
};

const addToBuffer = (type, message, callback, error) => {
messageBuffer.push([type, message, callback, error]);
};

const flushBuffer = () => {
if (that.state !== that.CONNECTED) {
return;
}
messageBuffer.forEach((message) => {
that.sendMessage(...message);
});
};

that.connect = (token, callback = defaultCallback, error = defaultCallback) => {
const options = {
reconnect: false,
reconnection: true,
reconnectionAttempts: 3,
secure: token.secure,
forceNew: true,
transports: ['websocket'],
rejectUnauthorized: false,
};
const transport = token.secure ? 'wss://' : 'ws://';
socket = that.IO.connect(transport + token.host, options);

const host = token.host;
socket = that.IO.connect(transport + host, options);

// Hack to know the exact reason of the WS closure (socket.io does not publish it)
let closeCode = WEBSOCKET_NORMAL_CLOSURE;
const socketOnCloseFunction = socket.io.engine.transport.ws.onclose;
socket.io.engine.transport.ws.onclose = (closeEvent) => {
Logger.warning('WebSocket closed, code:', closeEvent.code);
closeCode = closeEvent.code;
socketOnCloseFunction(closeEvent);
};
that.socket = socket;
socket.on('onAddStream', emit.bind(that, 'onAddStream'));

socket.on('signaling_message_erizo', emit.bind(that, 'signaling_message_erizo'));
socket.on('signaling_message_peer', emit.bind(that, 'signaling_message_peer'));
socket.on('publish_me', emit.bind(that, 'publish_me'));
socket.on('unpublish_me', emit.bind(that, 'unpublish_me'));
socket.on('onBandwidthAlert', emit.bind(that, 'onBandwidthAlert'));

// We receive an event of new data in one of the streams
Expand All @@ -59,15 +87,62 @@ const Socket = (newIo) => {
socket.on('onRemoveStream', emit.bind(that, 'onRemoveStream'));

// The socket has disconnected
socket.on('disconnect', emit.bind(that, 'disconnect'));
socket.on('disconnect', (reason) => {
Logger.debug('disconnect', that.id, reason);
if (closeCode !== WEBSOCKET_NORMAL_CLOSURE) {
that.state = that.RECONNECTING;
return;
}
emit.bind(that, 'disconnect');
socket.close();
});

socket.on('connection_failed', emit.bind(that, 'connection_failed'));
socket.on('error', emit.bind(that, 'error'));
socket.on('connection_failed', () => {
Logger.error('connection failed, id:', that.id);
emit.bind(that, 'connection_failed');
});
socket.on('error', (err) => {
Logger.warning('socket error, id:', that.id, ', error:', err.message);
emit.bind(that, 'error');
});
socket.on('connect_error', (err) => {
Logger.warning('connect error, id:', that.id, ', error:', err.message);
});

socket.on('connect_timeout', (err) => {
Logger.warning('connect timeout, id:', that.id, ', error:', err.message);
});

socket.on('reconnecting', (attemptNumber) => {
Logger.debug('reconnecting, id:', that.id, ', attempet:', attemptNumber);
});

socket.on('reconnect', (attemptNumber) => {
Logger.debug('reconnected, id:', that.id, ', attempet:', attemptNumber);
that.state = that.CONNECTED;
socket.emit('reconnected', that.id);
flushBuffer();
});

socket.on('reconnect_attempt', (attemptNumber) => {
Logger.debug('reconnect attempt, id:', that.id, ', attempet:', attemptNumber);
});

socket.on('reconnect_error', (err) => {
Logger.debug('error reconnecting, id:', that.id, ', error:', err.message);
});

socket.on('reconnect_failed', () => {
Logger.warning('reconnect failed, id:', that.id);
that.state = that.DISCONNECTED;
emit.bind(that, 'disconnect');
});

// First message with the token
that.sendMessage('token', token, (...args) => {
that.sendMessage('token', token, (response) => {
that.state = that.CONNECTED;
callback(...args);
that.id = response.clientId;
callback(response);
}, error);
};

Expand All @@ -82,6 +157,10 @@ const Socket = (newIo) => {
Logger.error('Trying to send a message over a disconnected Socket');
return;
}
if (that.state === that.RECONNECTING) {
addToBuffer(type, msg, callback, error);
return;
}
socket.emit(type, msg, (respType, resp) => {
if (respType === 'success') {
callback(resp);
Expand Down
Loading

0 comments on commit 3ddccc2

Please sign in to comment.