Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix memory leaks and optimize code #1636

Merged
merged 3 commits into from
Sep 24, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions erizo/src/erizo/WebRtcConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,10 @@ void WebRtcConnection::syncClose() {
ELOG_DEBUG("%s message: Close ended", toLog());
}

void WebRtcConnection::close() {
boost::future<void> WebRtcConnection::close() {
ELOG_DEBUG("%s message: Async close called", toLog());
std::shared_ptr<WebRtcConnection> shared_this = shared_from_this();
asyncTask([shared_this] (std::shared_ptr<WebRtcConnection> connection) {
return asyncTask([shared_this] (std::shared_ptr<WebRtcConnection> connection) {
shared_this->syncClose();
});
}
Expand Down
2 changes: 1 addition & 1 deletion erizo/src/erizo/WebRtcConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class WebRtcConnection: public TransportListener, public LogContext, public Hand
* @return True if the candidates are gathered.
*/
bool init();
void close();
boost::future<void> close();
void syncClose();

boost::future<void> setRemoteSdpInfo(std::shared_ptr<SdpInfo> sdp, int received_session_version);
Expand Down
2 changes: 2 additions & 0 deletions erizo/src/erizo/dtls/DtlsClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,8 @@ int createCert(const std::string& pAor, int expireDays, int keyLen, X509*& outCe
delete[] client_key_buffer;
delete[] server_key_buffer;
delete keys;
delete client_key;
delete server_key;

srtp_profile = mSocket->getSrtpProfile();

Expand Down
2 changes: 1 addition & 1 deletion erizoAPI/MediaStream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ NAN_METHOD(MediaStream::close) {
obj->Ref();
obj->close().then(
[persistent, obj] (boost::future<void>) {
ELOG_DEBUG("%s, MediaStream Close is finishied, resolving promise", obj->toLog());
ELOG_DEBUG("%s, MediaStream Close is finished, resolving promise", obj->toLog());
obj->notifyFuture(persistent);
});
info.GetReturnValue().Set(resolver->GetPromise());
Expand Down
115 changes: 83 additions & 32 deletions erizoAPI/WebRtcConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,24 +69,11 @@ WebRtcConnection::WebRtcConnection() : closed_{false}, id_{"undefined"} {

WebRtcConnection::~WebRtcConnection() {
close();
delete event_callback_;
ELOG_DEBUG("%s, message: Destroyed", toLog());
}

void WebRtcConnection::close() {
ELOG_DEBUG("%s, message: Trying to close", toLog());
if (closed_) {
ELOG_DEBUG("%s, message: Already closed", toLog());
return;
}
ELOG_DEBUG("%s, message: Closing", toLog());
if (me) {
me->setWebRtcConnectionEventListener(nullptr);
me->close();
me.reset();
}

boost::mutex::scoped_lock lock(mutex);

void WebRtcConnection::closeEvents() {
if (!uv_is_closing(reinterpret_cast<uv_handle_t*>(async_))) {
ELOG_DEBUG("%s, message: Closing handle", toLog());
uv_close(reinterpret_cast<uv_handle_t*>(async_), destroyWebRtcConnectionAsyncHandle);
Expand All @@ -98,8 +85,29 @@ void WebRtcConnection::close() {
}
async_ = nullptr;
future_async_ = nullptr;
ELOG_DEBUG("%s, message: Closed, pendingRefs: %d", toLog(), refs_);
jcague marked this conversation as resolved.
Show resolved Hide resolved
}

boost::future<std::string> WebRtcConnection::close() {
auto close_promise = std::make_shared<boost::promise<std::string>>();
ELOG_DEBUG("%s, message: Trying to close", toLog());
if (closed_) {
ELOG_DEBUG("%s, message: Already closed", toLog());
close_promise->set_value("");
return close_promise->get_future();
}
closed_ = true;

ELOG_DEBUG("%s, message: Closing", toLog());
if (me) {
me->setWebRtcConnectionEventListener(nullptr);
me->close().then([this, close_promise] (boost::future<void>) {
close_promise->set_value(std::string("webrtcconnection_closed"));
me.reset();
});
}
ELOG_DEBUG("%s, message: Closed", toLog());
return close_promise->get_future();
}

std::string WebRtcConnection::toLog() {
Expand Down Expand Up @@ -252,6 +260,7 @@ NAN_METHOD(WebRtcConnection::New) {
rtp_mappings, ext_mappings, enable_connection_quality_check,
obj);
obj->Wrap(info.This());
obj->Ref();
info.GetReturnValue().Set(info.This());
} else {
// TODO(pedro) Check what happens here
Expand All @@ -260,7 +269,22 @@ NAN_METHOD(WebRtcConnection::New) {

NAN_METHOD(WebRtcConnection::close) {
WebRtcConnection* obj = Nan::ObjectWrap::Unwrap<WebRtcConnection>(info.Holder());
obj->close();
std::shared_ptr<erizo::WebRtcConnection> me = obj->me;
v8::Local<v8::Promise::Resolver> resolver = v8::Promise::Resolver::New(Nan::GetCurrentContext()).ToLocalChecked();
if (!me) {
resolver->Resolve(Nan::GetCurrentContext(), Nan::New("").ToLocalChecked()).IsNothing();
info.GetReturnValue().Set(resolver->GetPromise());
return;
}

Nan::Persistent<v8::Promise::Resolver> *persistent = new Nan::Persistent<v8::Promise::Resolver>(resolver);
obj->close().then(
[persistent, obj] (boost::future<std::string> fut) {
ELOG_DEBUG("%s, message: WebRTCConnection Close is finished, resolving promise", obj->toLog());
ResultVariant result = fut.get();
obj->notifyFuture(persistent, result);
});
info.GetReturnValue().Set(resolver->GetPromise());
}

NAN_METHOD(WebRtcConnection::init) {
Expand All @@ -279,7 +303,10 @@ NAN_METHOD(WebRtcConnection::init) {
NAN_METHOD(WebRtcConnection::createOffer) {
WebRtcConnection* obj = Nan::ObjectWrap::Unwrap<WebRtcConnection>(info.Holder());
std::shared_ptr<erizo::WebRtcConnection> me = obj->me;
v8::Local<v8::Promise::Resolver> resolver = v8::Promise::Resolver::New(Nan::GetCurrentContext()).ToLocalChecked();
if (!me) {
resolver->Resolve(Nan::GetCurrentContext(), Nan::New("").ToLocalChecked()).IsNothing();
info.GetReturnValue().Set(resolver->GetPromise());
return;
}

Expand All @@ -290,9 +317,8 @@ NAN_METHOD(WebRtcConnection::createOffer) {
bool audio_enabled = Nan::To<bool>(info[1]).FromJust();
bool bundle = Nan::To<bool>(info[2]).FromJust();

v8::Local<v8::Promise::Resolver> resolver = v8::Promise::Resolver::New(Nan::GetCurrentContext()).ToLocalChecked();
Nan::Persistent<v8::Promise::Resolver> *persistent = new Nan::Persistent<v8::Promise::Resolver>(resolver);
obj->Ref();

me->createOffer(video_enabled, audio_enabled, bundle).then(
[persistent, obj] (boost::future<void>) {
obj->notifyFuture(persistent);
Expand Down Expand Up @@ -331,8 +357,11 @@ NAN_METHOD(WebRtcConnection::setMetadata) {
NAN_METHOD(WebRtcConnection::setRemoteDescription) {
WebRtcConnection* obj = Nan::ObjectWrap::Unwrap<WebRtcConnection>(info.Holder());
std::shared_ptr<erizo::WebRtcConnection> me = obj->me;

v8::Local<v8::Promise::Resolver> resolver = v8::Promise::Resolver::New(Nan::GetCurrentContext()).ToLocalChecked();
if (!me) {
info.GetReturnValue().Set(Nan::New(false));
resolver->Resolve(Nan::GetCurrentContext(), Nan::New("").ToLocalChecked()).IsNothing();
info.GetReturnValue().Set(resolver->GetPromise());
return;
}

Expand All @@ -341,10 +370,8 @@ NAN_METHOD(WebRtcConnection::setRemoteDescription) {
int received_session_version = Nan::To<int>(info[1]).FromJust();
auto sdp = std::make_shared<erizo::SdpInfo>(*param->me.get());

v8::Local<v8::Promise::Resolver> resolver = v8::Promise::Resolver::New(Nan::GetCurrentContext()).ToLocalChecked();
Nan::Persistent<v8::Promise::Resolver> *persistent = new Nan::Persistent<v8::Promise::Resolver>(resolver);

obj->Ref();
me->setRemoteSdpInfo(sdp, received_session_version).then(
[persistent, obj] (boost::future<void>) {
obj->notifyFuture(persistent);
Expand All @@ -356,13 +383,16 @@ NAN_METHOD(WebRtcConnection::setRemoteDescription) {
NAN_METHOD(WebRtcConnection::getLocalDescription) {
WebRtcConnection* obj = Nan::ObjectWrap::Unwrap<WebRtcConnection>(info.Holder());
std::shared_ptr<erizo::WebRtcConnection> me = obj->me;

v8::Local<v8::Promise::Resolver> resolver = v8::Promise::Resolver::New(Nan::GetCurrentContext()).ToLocalChecked();
if (!me) {
resolver->Resolve(Nan::GetCurrentContext(), Nan::New("").ToLocalChecked()).IsNothing();
info.GetReturnValue().Set(resolver->GetPromise());
return;
}
v8::Local<v8::Promise::Resolver> resolver = v8::Promise::Resolver::New(Nan::GetCurrentContext()).ToLocalChecked();
Nan::Persistent<v8::Promise::Resolver> *persistent = new Nan::Persistent<v8::Promise::Resolver>(resolver);
obj->Ref();

Nan::Persistent<v8::Promise::Resolver> *persistent = new Nan::Persistent<v8::Promise::Resolver>(resolver);
ELOG_DEBUG("%s, message: getLocalDescription", obj->toLog());
me->getLocalSdpInfo().then(
[persistent, obj] (boost::future<std::shared_ptr<erizo::SdpInfo>> fut) {
std::shared_ptr<erizo::SdpInfo> sdp_info = fut.get();
Expand Down Expand Up @@ -474,16 +504,19 @@ NAN_METHOD(WebRtcConnection::getConnectionQualityLevel) {
NAN_METHOD(WebRtcConnection::addMediaStream) {
WebRtcConnection* obj = Nan::ObjectWrap::Unwrap<WebRtcConnection>(info.Holder());
std::shared_ptr<erizo::WebRtcConnection> me = obj->me;

v8::Local<v8::Promise::Resolver> resolver = v8::Promise::Resolver::New(Nan::GetCurrentContext()).ToLocalChecked();
if (!me) {
resolver->Resolve(Nan::GetCurrentContext(), Nan::New("").ToLocalChecked()).IsNothing();
info.GetReturnValue().Set(resolver->GetPromise());
return;
}

MediaStream* param = Nan::ObjectWrap::Unwrap<MediaStream>(Nan::To<v8::Object>(info[0]).ToLocalChecked());
auto ms = std::shared_ptr<erizo::MediaStream>(param->me);

v8::Local<v8::Promise::Resolver> resolver = v8::Promise::Resolver::New(Nan::GetCurrentContext()).ToLocalChecked();
Nan::Persistent<v8::Promise::Resolver> *persistent = new Nan::Persistent<v8::Promise::Resolver>(resolver);
obj->Ref();

me->addMediaStream(ms).then(
[persistent, obj] (boost::future<void>) {
obj->notifyFuture(persistent);
Expand All @@ -495,16 +528,19 @@ NAN_METHOD(WebRtcConnection::addMediaStream) {
NAN_METHOD(WebRtcConnection::removeMediaStream) {
WebRtcConnection* obj = Nan::ObjectWrap::Unwrap<WebRtcConnection>(info.Holder());
std::shared_ptr<erizo::WebRtcConnection> me = obj->me;

v8::Local<v8::Promise::Resolver> resolver = v8::Promise::Resolver::New(Nan::GetCurrentContext()).ToLocalChecked();
if (!me) {
resolver->Resolve(Nan::GetCurrentContext(), Nan::New("").ToLocalChecked()).IsNothing();
info.GetReturnValue().Set(resolver->GetPromise());
return;
}

Nan::Utf8String param(Nan::To<v8::String>(info[0]).ToLocalChecked());
std::string stream_id = std::string(*param);

v8::Local<v8::Promise::Resolver> resolver = v8::Promise::Resolver::New(Nan::GetCurrentContext()).ToLocalChecked();
Nan::Persistent<v8::Promise::Resolver> *persistent = new Nan::Persistent<v8::Promise::Resolver>(resolver);
obj->Ref();

me->removeMediaStream(stream_id).then(
[persistent, obj] (boost::future<void>) {
obj->notifyFuture(persistent);
Expand Down Expand Up @@ -562,27 +598,36 @@ NAUV_WORK_CB(WebRtcConnection::eventsCallback) {
void WebRtcConnection::notifyFuture(Nan::Persistent<v8::Promise::Resolver> *persistent, ResultVariant result) {
boost::mutex::scoped_lock lock(mutex);
if (!future_async_) {
ELOG_DEBUG("%s, message: Future async does not exist anymore", toLog());
return;
}
ELOG_DEBUG("%s, message: Added future to async send", toLog());
ResultPair result_pair(persistent, result);
futures.push(result_pair);
future_async_->data = this;
Ref();
uv_async_send(future_async_);
}

NAUV_WORK_CB(WebRtcConnection::promiseResolver) {
Nan::HandleScope scope;
WebRtcConnection* obj = reinterpret_cast<WebRtcConnection*>(async->data);
if (!obj || !obj->me) {
if (!obj) {
ELOG_DEBUG("message: promiseResolver with null object");
return;
}
bool closed = false;
boost::mutex::scoped_lock lock(obj->mutex);
ELOG_DEBUG("%s, message: promiseResolver", obj->toLog());
ELOG_DEBUG("%s, message: promiseResolver, refs: %d", obj->toLog(), obj->futures.size());
while (!obj->futures.empty()) {
auto persistent = obj->futures.front().first;
v8::Local<v8::Promise::Resolver> resolver = Nan::New(*persistent);
ResultVariant r = obj->futures.front().second;
if (boost::get<std::string>(&r) != nullptr) {
std::string result = boost::get<std::string>(r);
if (result == "webrtcconnection_closed") {
closed = true;
}
resolver->Resolve(Nan::GetCurrentContext(), Nan::New(boost::get<std::string>(r).c_str()).ToLocalChecked())
.IsNothing();
} else if (boost::get<std::shared_ptr<erizo::SdpInfo>>(&r) != nullptr) {
Expand All @@ -598,8 +643,14 @@ NAUV_WORK_CB(WebRtcConnection::promiseResolver) {
persistent->Reset();
delete persistent;
obj->futures.pop();
obj->Unref();
v8::Isolate::GetCurrent()->RunMicrotasks();
obj->Unref();
}

ELOG_DEBUG("%s, message: promiseResolver finished, refs: %d, closed: %d", obj->toLog(),
obj->refs_, obj->closed_);
if (closed) {
obj->closeEvents();
obj->Unref();
}
ELOG_DEBUG("%s, message: promiseResolver finished", obj->toLog());
}
3 changes: 2 additions & 1 deletion erizoAPI/WebRtcConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ class WebRtcConnection : public erizo::WebRtcConnectionEventListener,
~WebRtcConnection();

std::string toLog();
void close();
void closeEvents();
boost::future<std::string> close();

Nan::Callback *event_callback_;
uv_async_t *async_;
Expand Down
1 change: 1 addition & 0 deletions erizo_controller/erizoJS/erizoJSController.js
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ exports.ErizoJSController = (erizoJSId, threadPool, ioThreadPool) => {
const closePromise = node.close(sendOffer);

return closePromise.then(() => {
log.debug(`message: Node Closed, clientId: ${node.clientId}, streamId: ${node.streamId}`);
const client = clients.get(clientId);
if (client === undefined) {
log.debug('message: trying to close node with no associated client,' +
Expand Down
16 changes: 12 additions & 4 deletions erizo_controller/erizoJS/models/Connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,11 @@ class Connection extends events.EventEmitter {
}

getLocalSdp() {
if (!this.wrtc) {
return Promise.resolve();
}
return this.wrtc.getLocalDescription().then((desc) => {
if (!desc) {
if (!this.wrtc || !desc) {
log.error('Cannot get local description,',
logger.objectToLog(this.options), logger.objectToLog(this.options.metadata));
return '';
Expand Down Expand Up @@ -319,7 +322,7 @@ class Connection extends events.EventEmitter {
return Promise.resolve();
});
}
log.error(`message: Trying to remove mediaStream not found, id: ${id},`,
log.error(`message: Trying to remove mediaStream not found, clientId: ${this.clientId}, streamId: ${id}`,
logger.objectToLog(this.options), logger.objectToLog(this.options.metadata));
return promise;
}
Expand Down Expand Up @@ -488,9 +491,14 @@ class Connection extends events.EventEmitter {
promises.push(mediaStream.close());
});
Promise.all(promises).then(() => {
this.wrtc.close();
log.debug(`message: Closing WRTC, id: ${this.id},`,
logger.objectToLog(this.options), logger.objectToLog(this.options.metadata));
this.wrtc.close().then(() => {
log.debug(`message: WRTC closed, id: ${this.id},`,
logger.objectToLog(this.options), logger.objectToLog(this.options.metadata));
delete this.wrtc;
});
this.mediaStreams.clear();
delete this.wrtc;
});
}
}
Expand Down
2 changes: 1 addition & 1 deletion erizo_controller/erizoJS/models/Publisher.js
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ class Publisher extends Source {
}

close() {
const removeMediaStreamPromise = this.connection.removeMediaStream(this.mediaStream.id);
const removeMediaStreamPromise = this.connection.removeMediaStream(this.mediaStream.id, false);
if (this.mediaStream.monitorInterval) {
clearInterval(this.mediaStream.monitorInterval);
}
Expand Down
3 changes: 2 additions & 1 deletion erizo_controller/erizoJS/models/Subscriber.js
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,12 @@ class Subscriber extends NodeClass {
}

close(sendOffer = true) {
log.debug(`message: Closing subscriber, streamId:${this.streamId}, `,
log.debug(`message: Closing subscriber, clientId: ${this.clientId}, streamId: ${this.streamId}, `,
logger.objectToLog(this.options), logger.objectToLog(this.options.metadata));
this.publisher = undefined;
let promise = Promise.resolve();
if (this.connection) {
log.debug(`message: Removing Media Stream, clientId: ${this.clientId}, streamId: ${this.streamId}`);
promise = this.connection.removeMediaStream(this.mediaStream.id, sendOffer);
this.connection.removeListener('media_stream_event', this._mediaStreamListener);
}
Expand Down
3 changes: 2 additions & 1 deletion test/negotiation/utils/ClientStream.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
let currentClientStreamId = 0;
class ClientStream {
constructor(page) {
this.page = page;
this.id = parseInt(Math.random() * 10000);
this.id = currentClientStreamId++;
this.audio = true;
this.video = true;
this.data = true;
Expand Down
Loading