Skip to content

Commit

Permalink
Make getLocalSdpInfo async (lynckia#1441)
Browse files Browse the repository at this point in the history
  • Loading branch information
lodoyun authored Jul 25, 2019
1 parent 11c08d9 commit 7405aa9
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 43 deletions.
18 changes: 17 additions & 1 deletion erizo/src/erizo/WebRtcConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ bool WebRtcConnection::createOfferSync(bool video_enabled, bool audio_enabled, b

boost::future<void> WebRtcConnection::addMediaStream(std::shared_ptr<MediaStream> media_stream) {
return asyncTask([media_stream] (std::shared_ptr<WebRtcConnection> connection) {
boost::mutex::scoped_lock lock(connection->update_state_mutex_);
ELOG_DEBUG("%s message: Adding mediaStream, id: %s", connection->toLog(), media_stream->getId().c_str());
connection->media_streams_.push_back(media_stream);
});
Expand Down Expand Up @@ -268,7 +269,22 @@ void WebRtcConnection::copyDataToLocalSdpIndo(std::shared_ptr<SdpInfo> sdp_info)
});
}

std::shared_ptr<SdpInfo> WebRtcConnection::getLocalSdpInfo() {
boost::future<std::shared_ptr<SdpInfo>> WebRtcConnection::getLocalSdpInfo() {
std::weak_ptr<WebRtcConnection> weak_this = shared_from_this();
auto task_promise = std::make_shared<boost::promise<std::shared_ptr<SdpInfo>>>();
worker_->task([weak_this, task_promise] {
std::shared_ptr<SdpInfo> info;
if (auto this_ptr = weak_this.lock()) {
info = this_ptr->getLocalSdpInfoSync();
} else {
ELOG_WARN("%s message: Error trying to getLocalSdpInfo, returning empty", this_ptr->toLog());
}
task_promise->set_value(info);
});
return task_promise->get_future();
}

std::shared_ptr<SdpInfo> WebRtcConnection::getLocalSdpInfoSync() {
boost::mutex::scoped_lock lock(update_state_mutex_);
ELOG_DEBUG("%s message: getting local SDPInfo", toLog());
forEachMediaStream([this] (const std::shared_ptr<MediaStream> &media_stream) {
Expand Down
3 changes: 2 additions & 1 deletion erizo/src/erizo/WebRtcConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ class WebRtcConnection: public TransportListener, public LogContext,
* Obtains the local SDP.
* @return The SDP as a SdpInfo.
*/
std::shared_ptr<SdpInfo> getLocalSdpInfo();
boost::future<std::shared_ptr<SdpInfo>> getLocalSdpInfo();
std::shared_ptr<SdpInfo> getLocalSdpInfoSync();
/**
* Copy some SdpInfo data to local SdpInfo
*/
Expand Down
37 changes: 27 additions & 10 deletions erizoAPI/WebRtcConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -350,13 +350,17 @@ NAN_METHOD(WebRtcConnection::getLocalDescription) {
if (!me) {
return;
}
v8::Local<v8::Promise::Resolver> resolver = v8::Promise::Resolver::New(info.GetIsolate());
Nan::Persistent<v8::Promise::Resolver> *persistent = new Nan::Persistent<v8::Promise::Resolver>(resolver);
obj->Ref();

std::shared_ptr<erizo::SdpInfo> sdp_info = std::make_shared<erizo::SdpInfo>(*me->getLocalSdpInfo().get());

v8::Local<v8::Object> instance = ConnectionDescription::NewInstance();
ConnectionDescription* description = ObjectWrap::Unwrap<ConnectionDescription>(instance);
description->me = sdp_info;
info.GetReturnValue().Set(instance);
me->getLocalSdpInfo().then(
[persistent, obj] (boost::future<std::shared_ptr<erizo::SdpInfo>> fut) {
std::shared_ptr<erizo::SdpInfo> sdp_info = std::make_shared<erizo::SdpInfo>(*fut.get().get());
ResultVariant result = sdp_info;
obj->notifyFuture(persistent, result);
});
info.GetReturnValue().Set(resolver->GetPromise());
}

NAN_METHOD(WebRtcConnection::copySdpToLocalDescription) {
Expand Down Expand Up @@ -492,12 +496,13 @@ NAUV_WORK_CB(WebRtcConnection::eventsCallback) {
ELOG_DEBUG("%s, message: eventsCallback finished", obj->toLog());
}

void WebRtcConnection::notifyFuture(Nan::Persistent<v8::Promise::Resolver> *persistent) {
void WebRtcConnection::notifyFuture(Nan::Persistent<v8::Promise::Resolver> *persistent, ResultVariant result) {
boost::mutex::scoped_lock lock(mutex);
if (!future_async_) {
return;
}
futures.push(persistent);
ResultPair result_pair(persistent, result);
futures.push(result_pair);
future_async_->data = this;
uv_async_send(future_async_);
}
Expand All @@ -512,9 +517,21 @@ NAUV_WORK_CB(WebRtcConnection::promiseResolver) {
ELOG_DEBUG("%s, message: promiseResolver", obj->toLog());
obj->futures_manager_.cleanResolvedFutures();
while (!obj->futures.empty()) {
auto persistent = obj->futures.front();
auto persistent = obj->futures.front().first;
v8::Local<v8::Promise::Resolver> resolver = Nan::New(*persistent);
resolver->Resolve(Nan::GetCurrentContext(), Nan::New("").ToLocalChecked());
ResultVariant r = obj->futures.front().second;
if (boost::get<std::string>(&r) != nullptr) {
resolver->Resolve(Nan::GetCurrentContext(), Nan::New(boost::get<std::string>(r).c_str()).ToLocalChecked());
} else if (boost::get<std::shared_ptr<erizo::SdpInfo>>(&r) != nullptr) {
std::shared_ptr<erizo::SdpInfo> sdp_info = boost::get<std::shared_ptr<erizo::SdpInfo>>(r);
v8::Local<v8::Object> instance = ConnectionDescription::NewInstance();
ConnectionDescription* description = ObjectWrap::Unwrap<ConnectionDescription>(instance);
description->me = sdp_info;
resolver->Resolve(Nan::GetCurrentContext(), instance);
} else {
ELOG_WARN("%s, message: Resolving promise with no valid value, using empty string", obj->toLog());
resolver->Resolve(Nan::GetCurrentContext(), Nan::New("").ToLocalChecked());
}
obj->futures.pop();
obj->Unref();
}
Expand Down
9 changes: 7 additions & 2 deletions erizoAPI/WebRtcConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <nan.h>
#include <WebRtcConnection.h>
#include <logger.h>
#include <boost/variant.hpp>
#include "FuturesManager.h"
#include "MediaDefinitions.h"
#include "OneToManyProcessor.h"
Expand All @@ -13,6 +14,9 @@
#include <string>
#include <future> // NOLINT

typedef boost::variant<std::string, std::shared_ptr<erizo::SdpInfo>> ResultVariant;
typedef std::pair<Nan::Persistent<v8::Promise::Resolver> *, ResultVariant> ResultPair;

/*
* Wrapper class of erizo::WebRtcConnection
*
Expand All @@ -28,7 +32,7 @@ class WebRtcConnection : public erizo::WebRtcConnectionEventListener,
std::shared_ptr<erizo::WebRtcConnection> me;
std::queue<int> event_status;
std::queue<std::string> event_messages;
std::queue<Nan::Persistent<v8::Promise::Resolver> *> futures;
std::queue<ResultPair> futures;
FuturesManager futures_manager_;

boost::mutex mutex;
Expand Down Expand Up @@ -118,7 +122,8 @@ class WebRtcConnection : public erizo::WebRtcConnectionEventListener,

virtual void notifyEvent(erizo::WebRTCEvent event,
const std::string& message = "");
virtual void notifyFuture(Nan::Persistent<v8::Promise::Resolver> *persistent);
virtual void notifyFuture(Nan::Persistent<v8::Promise::Resolver> *persistent,
ResultVariant result = ResultVariant());
};

#endif // ERIZOAPI_WEBRTCCONNECTION_H_
70 changes: 42 additions & 28 deletions erizo_controller/erizoJS/models/Connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -128,60 +128,74 @@ class Connection extends events.EventEmitter {
}

createAnswer() {
return { type: 'answer', sdp: this.getLocalSdp() };
return this.getLocalSdp().then((info) => {
log.debug('getting local sdp for answer', info);
return { type: 'answer', sdp: info };
});
}

createOffer() {
return { type: 'offer', sdp: this.getLocalSdp() };
return this.getLocalSdp().then((info) => {
log.debug('getting local sdp for offer', info);
return { type: 'offer', sdp: info };
});
}

getLocalSdp() {
this.wrtc.localDescription = new SessionDescription(this.wrtc.getLocalDescription());
const sdp = this.wrtc.localDescription.getSdp(this.sessionVersion);
this.sessionVersion += 1;
let message = sdp.toString();
message = message.replace(this.options.privateRegexp, this.options.publicIP);
return message;
return this.wrtc.getLocalDescription().then((desc) => {
this.wrtc.localDescription = new SessionDescription(desc);
const sdp = this.wrtc.localDescription.getSdp(this.sessionVersion);
this.sessionVersion += 1;
let message = sdp.toString();
message = message.replace(this.options.privateRegexp, this.options.publicIP);
return message;
});
}

sendOffer() {
if (!this.alreadyGathered && !this.trickleIce) {
return;
}
const info = this.createOffer();
log.debug(`message: sendAnswer sending event, type: ${info.type}, sessionVersion: ${this.sessionVersion}`);
this._onStatusEvent(info, CONN_SDP);
this.createOffer().then((info) => {
log.debug(`message: sendOffer sending event, type: ${info.type}, sessionVersion: ${this.sessionVersion}`);
this._onStatusEvent(info, CONN_SDP);
});
}

sendAnswer(evt = CONN_SDP_PROCESSED, forceOffer = false) {
if (!this.alreadyGathered && !this.trickleIce) {
return;
}
const info = this.options.createOffer || forceOffer ? this.createOffer() : this.createAnswer();
log.debug(`message: sendAnswer sending event, type: ${info.type}, sessionVersion: ${this.sessionVersion}`);
this._onStatusEvent(info, evt);
const promise =
this.options.createOffer || forceOffer ? this.createOffer() : this.createAnswer();
promise.then((info) => {
log.debug(`message: sendAnswer sending event, type: ${info.type}, sessionVersion: ${this.sessionVersion}`);
this._onStatusEvent(info, evt);
});
}

_resendLastAnswer(evt, streamId, label, forceOffer = false, removeStream = false) {
if (!this.wrtc || !this.wrtc.localDescription) {
log.debug('message: _resendLastAnswer, this.wrtc or this.wrtc.localDescription are not present');
return Promise.reject('fail');
}
this.wrtc.localDescription = new SessionDescription(this.wrtc.getLocalDescription());
const sdp = this.wrtc.localDescription.getSdp(this.sessionVersion);
const stream = sdp.getStream(label);
if (stream && removeStream) {
log.info(`resendLastAnswer: StreamId ${streamId} is stream and removeStream, label ${label}, sessionVersion ${this.sessionVersion}`);
return Promise.reject('retry');
}
this.sessionVersion += 1;
let message = sdp.toString();
message = message.replace(this.options.privateRegexp, this.options.publicIP);
return this.wrtc.getLocalDescription().then((localDescription) => {
this.wrtc.localDescription = new SessionDescription(localDescription);
const sdp = this.wrtc.localDescription.getSdp(this.sessionVersion);
const stream = sdp.getStream(label);
if (stream && removeStream) {
log.info(`resendLastAnswer: StreamId ${streamId} is stream and removeStream, label ${label}, sessionVersion ${this.sessionVersion}`);
return Promise.reject('retry');
}
this.sessionVersion += 1;
let message = sdp.toString();
message = message.replace(this.options.privateRegexp, this.options.publicIP);

const info = { type: this.options.createOffer || forceOffer ? 'offer' : 'answer', sdp: message };
log.debug(`message: _resendLastAnswer sending event, type: ${info.type}, streamId: ${streamId}`);
this._onStatusEvent(info, evt);
return Promise.resolve();
const info = { type: this.options.createOffer || forceOffer ? 'offer' : 'answer', sdp: message };
log.debug(`message: _resendLastAnswer sending event, type: ${info.type}, streamId: ${streamId}`);
this._onStatusEvent(info, evt);
return Promise.resolve();
});
}

init(createOffer = this.options.createOffer) {
Expand Down
3 changes: 2 additions & 1 deletion erizo_controller/test/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ module.exports.reset = () => {
createOffer: sinon.stub(),
setRemoteSdp: sinon.stub(),
setRemoteDescription: sinon.stub(),
getLocalDescription: sinon.stub().returns(module.exports.ConnectionDescription),
getLocalDescription: sinon.stub()
.returns(Promise.resolve(module.exports.ConnectionDescription)),
addRemoteCandidate: sinon.stub(),
addMediaStream: sinon.stub().returns(Promise.resolve()),
removeMediaStream: sinon.stub(),
Expand Down

0 comments on commit 7405aa9

Please sign in to comment.