Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Erizo Controller refactoring to add Clients and Channels #1041

Merged
merged 8 commits into from
Sep 28, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 14 additions & 11 deletions erizo_controller/common/amqper.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,14 @@ exports.connect = function(callback) {
}
} catch(err) {
log.error('message: error processing message, ' +
'queueName: ' + clientQueue.name + ', ' +
logger.objectToLog(err));
'queueName: ' + clientQueue.name + ', error: ' + err.message);
}
});

});
} catch (err) {
log.error('message: exchange error, ' +
'exchangeName: ' + exchange.name + ', ' +
logger.objectToLog(err));
'exchangeName: ' + exchange.name + ', error: ' + err.message);
}
});

Expand Down Expand Up @@ -127,15 +125,13 @@ exports.bind = function(id, callback) {
rpcPublic[message.method].apply(rpcPublic, message.args);
} catch (error) {
log.error('message: error processing call, ' +
'queueName: ' + q.name + ', ' +
logger.objectToLog(error));
'queueName: ' + q.name + ', error: ' + error.message);
}

});
} catch (err) {
log.error('message: queue error, ' +
'queueName: ' + q.name + ', ' +
logger.objectToLog(err));
'queueName: ' + q.name + ', error: ' + err.message);
}

});
Expand All @@ -161,16 +157,23 @@ exports.bindBroadcast = function(id, callback) {
}
if (body.message.method && rpcPublic[body.message.method]) {
body.message.args.push(answer);
rpcPublic[body.message.method].apply(rpcPublic, body.message.args);
try {
rpcPublic[body.message.method].apply(rpcPublic, body.message.args);
} catch(e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

log.warn('message: error processing call, error:', e.message);
}
} else {
try {
callback(body.message, answer);
} catch(e) {
log.warn('message: error processing callback, error:', e.message);
}
}
});

} catch (err) {
log.error('message: exchange error, ' +
'queueName: ' + q.name + ', ' +
logger.objectToLog(err));
'queueName: ' + q.name + ', error: ' + err.message);
}

});
Expand Down
2 changes: 2 additions & 0 deletions erizo_controller/common/logger.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
'use strict';
var log4js = require('log4js');

global.config = global.config || {};

global.config.logger = global.config.logger || {};

var logFile = global.config.logger.configFile || '../log4js_configuration.json';
Expand Down
19 changes: 19 additions & 0 deletions erizo_controller/erizoClient/src/Room.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const Room = (altIo, altConnection, specInput) => {
that.Connection = altConnection === undefined ? Connection : altConnection;

let socket = Socket(altIo);
that.socket = socket;
let remoteStreams = that.remoteStreams;
let localStreams = that.localStreams;

Expand Down Expand Up @@ -139,6 +140,16 @@ const Room = (altIo, altConnection, specInput) => {
connection.createOffer();
};

const removeLocalStreamP2PConnection = (streamInput, peerSocket) => {
const stream = streamInput;
if (stream.pc === undefined || !stream.pc.has(peerSocket)) {
return;
}
const pc = stream.pc.get(peerSocket);
pc.close();
stream.pc.remove(peerSocket);
};

const getErizoConnectionOptions = (stream, options, isRemote) => {
const connectionOpts = {
callback(message) {
Expand Down Expand Up @@ -240,6 +251,13 @@ const Room = (altIo, altConnection, specInput) => {
createLocalStreamP2PConnection(myStream, arg.peerSocket);
};

const socketOnUnpublishMe = (arg) => {
const myStream = localStreams.get(arg.streamId);
if (myStream) {
removeLocalStreamP2PConnection(myStream, arg.peerSocket);
}
};

const socketOnBandwidthAlert = (arg) => {
Logger.info('Bandwidth Alert on', arg.streamID, 'message',
arg.message, 'BW:', arg.bandwidth);
Expand Down Expand Up @@ -814,6 +832,7 @@ const Room = (altIo, altConnection, specInput) => {
socket.on('signaling_message_erizo', socketEventToArgs.bind(null, socketOnErizoMessage));
socket.on('signaling_message_peer', socketEventToArgs.bind(null, socketOnPeerMessage));
socket.on('publish_me', socketEventToArgs.bind(null, socketOnPublishMe));
socket.on('unpublish_me', socketEventToArgs.bind(null, socketOnUnpublishMe));
socket.on('onBandwidthAlert', socketEventToArgs.bind(null, socketOnBandwidthAlert));
socket.on('onDataStream', socketEventToArgs.bind(null, socketOnDataStream));
socket.on('onUpdateAttributeStream', socketEventToArgs.bind(null, socketOnUpdateAttributeStream));
Expand Down
95 changes: 87 additions & 8 deletions erizo_controller/erizoClient/src/Socket.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ const SocketEvent = (type, specInput) => {
const Socket = (newIo) => {
const that = EventDispatcher();
const defaultCallback = () => {};
const messageBuffer = [];

that.CONNECTED = Symbol('connected');
that.RECONNECTING = Symbol('reconnecting');
that.DISCONNECTED = Symbol('disconnected');

const WEBSOCKET_NORMAL_CLOSURE = 1000;
that.state = that.DISCONNECTED;
that.IO = newIo === undefined ? io : newIo;

Expand All @@ -31,22 +34,47 @@ const Socket = (newIo) => {
that.emit(SocketEvent(type, { args }));
};

const addToBuffer = (type, message, callback, error) => {
messageBuffer.push([type, message, callback, error]);
};

const flushBuffer = () => {
if (that.state !== that.CONNECTED) {
return;
}
messageBuffer.forEach((message) => {
that.sendMessage(...message);
});
};

that.connect = (token, callback = defaultCallback, error = defaultCallback) => {
const options = {
reconnect: false,
reconnection: true,
reconnectionAttempts: 3,
secure: token.secure,
forceNew: true,
transports: ['websocket'],
rejectUnauthorized: false,
};
const transport = token.secure ? 'wss://' : 'ws://';
socket = that.IO.connect(transport + token.host, options);

const host = token.host;
socket = that.IO.connect(transport + host, options);

// Hack to know the exact reason of the WS closure (socket.io does not publish it)
let closeCode = WEBSOCKET_NORMAL_CLOSURE;
const socketOnCloseFunction = socket.io.engine.transport.ws.onclose;
socket.io.engine.transport.ws.onclose = (closeEvent) => {
Logger.warning('WebSocket closed, code:', closeEvent.code);
closeCode = closeEvent.code;
socketOnCloseFunction(closeEvent);
};
that.socket = socket;
socket.on('onAddStream', emit.bind(that, 'onAddStream'));

socket.on('signaling_message_erizo', emit.bind(that, 'signaling_message_erizo'));
socket.on('signaling_message_peer', emit.bind(that, 'signaling_message_peer'));
socket.on('publish_me', emit.bind(that, 'publish_me'));
socket.on('unpublish_me', emit.bind(that, 'unpublish_me'));
socket.on('onBandwidthAlert', emit.bind(that, 'onBandwidthAlert'));

// We receive an event of new data in one of the streams
Expand All @@ -59,15 +87,62 @@ const Socket = (newIo) => {
socket.on('onRemoveStream', emit.bind(that, 'onRemoveStream'));

// The socket has disconnected
socket.on('disconnect', emit.bind(that, 'disconnect'));
socket.on('disconnect', (reason) => {
Logger.debug('disconnect', that.id, reason);
if (closeCode !== WEBSOCKET_NORMAL_CLOSURE) {
that.state = that.RECONNECTING;
return;
}
emit.bind(that, 'disconnect');
socket.close();
});

socket.on('connection_failed', emit.bind(that, 'connection_failed'));
socket.on('error', emit.bind(that, 'error'));
socket.on('connection_failed', () => {
Logger.error('connection failed, id:', that.id);
emit.bind(that, 'connection_failed');
});
socket.on('error', (err) => {
Logger.warning('socket error, id:', that.id, ', error:', err.message);
emit.bind(that, 'error');
});
socket.on('connect_error', (err) => {
Logger.warning('connect error, id:', that.id, ', error:', err.message);
});

socket.on('connect_timeout', (err) => {
Logger.warning('connect timeout, id:', that.id, ', error:', err.message);
});

socket.on('reconnecting', (attemptNumber) => {
Logger.debug('reconnecting, id:', that.id, ', attempet:', attemptNumber);
});

socket.on('reconnect', (attemptNumber) => {
Logger.debug('reconnected, id:', that.id, ', attempet:', attemptNumber);
that.state = that.CONNECTED;
socket.emit('reconnected', that.id);
flushBuffer();
});

socket.on('reconnect_attempt', (attemptNumber) => {
Logger.debug('reconnect attempt, id:', that.id, ', attempet:', attemptNumber);
});

socket.on('reconnect_error', (err) => {
Logger.debug('error reconnecting, id:', that.id, ', error:', err.message);
});

socket.on('reconnect_failed', () => {
Logger.warning('reconnect failed, id:', that.id);
that.state = that.DISCONNECTED;
emit.bind(that, 'disconnect');
});

// First message with the token
that.sendMessage('token', token, (...args) => {
that.sendMessage('token', token, (response) => {
that.state = that.CONNECTED;
callback(...args);
that.id = response.clientId;
callback(response);
}, error);
};

Expand All @@ -82,6 +157,10 @@ const Socket = (newIo) => {
Logger.error('Trying to send a message over a disconnected Socket');
return;
}
if (that.state === that.RECONNECTING) {
addToBuffer(type, msg, callback, error);
return;
}
socket.emit(type, msg, (respType, resp) => {
if (respType === 'success') {
callback(resp);
Expand Down
Loading