Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added events from mediaStream to ErizoJS #1194

Merged
merged 5 commits into from
Apr 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions erizo/src/erizo/MediaStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,18 @@ void MediaStream::read(std::shared_ptr<DataPacket> packet) {
} // if not Feedback
}

void MediaStream::setMediaStreamEventListener(MediaStreamEventListener* listener) {
boost::mutex::scoped_lock lock(event_listener_mutex_);
this->media_stream_event_listener_ = listener;
}

void MediaStream::notifyMediaStreamEvent(const std::string& type, const std::string& message) {
boost::mutex::scoped_lock lock(event_listener_mutex_);
if (this->media_stream_event_listener_ != nullptr) {
media_stream_event_listener_->notifyMediaStreamEvent(type, message);
}
}

void MediaStream::notifyToEventSink(MediaEventPtr event) {
event_sink_->deliverEvent(event);
}
Expand Down
16 changes: 16 additions & 0 deletions erizo/src/erizo/MediaStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ class MediaStreamStatsListener {
virtual void notifyStats(const std::string& message) = 0;
};


class MediaStreamEventListener {
public:
virtual ~MediaStreamEventListener() {
}
virtual void notifyMediaStreamEvent(const std::string& type, const std::string& message) = 0;
};
/**
* A MediaStream. This class represents a Media Stream that can be established with other peers via a SDP negotiation
*/
Expand Down Expand Up @@ -76,6 +83,13 @@ class MediaStream: public MediaSink, public MediaSource, public FeedbackSink,

WebRTCEvent getCurrentState();

/**
* Sets the Event Listener for this MediaStream
*/
void setMediaStreamEventListener(MediaStreamEventListener* listener);

void notifyMediaStreamEvent(const std::string& type, const std::string& message);

/**
* Sets the Stats Listener for this MediaStream
*/
Expand Down Expand Up @@ -148,6 +162,8 @@ class MediaStream: public MediaSink, public MediaSource, public FeedbackSink,
// parses incoming payload type, replaces occurence in buf

private:
boost::mutex event_listener_mutex_;
MediaStreamEventListener* media_stream_event_listener_;
std::shared_ptr<WebRtcConnection> connection_;
std::string stream_id_;
std::string mslabel_;
Expand Down
6 changes: 6 additions & 0 deletions erizo/src/erizo/Stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,13 @@ namespace erizo {
return root_.toString();
}

void Stats::setStatsListener(MediaStreamStatsListener* listener) {
boost::mutex::scoped_lock lock(listener_mutex_);
listener_ = listener;
}

void Stats::sendStats() {
boost::mutex::scoped_lock lock(listener_mutex_);
if (listener_) listener_->notifyStats(getStats());
}
} // namespace erizo
6 changes: 2 additions & 4 deletions erizo/src/erizo/Stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,11 @@ class Stats : public Service {

std::string getStats();

inline void setStatsListener(MediaStreamStatsListener* listener) {
listener_ = listener;
}

void setStatsListener(MediaStreamStatsListener* listener);
void sendStats();

private:
boost::mutex listener_mutex_;
MediaStreamStatsListener* listener_;
StatNode root_;
};
Expand Down
68 changes: 38 additions & 30 deletions erizo/src/erizo/WebRtcConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,12 @@ void WebRtcConnection::close() {
}

bool WebRtcConnection::init() {
if (conn_event_listener_ != nullptr) {
conn_event_listener_->notifyEvent(global_state_, "");
}
return true;
maybeNotifyWebRtcConnectionEvent(global_state_, "");
return true;
}

bool WebRtcConnection::createOffer(bool video_enabled, bool audioEnabled, bool bundle) {
boost::mutex::scoped_lock lock(updateStateMutex_);
boost::mutex::scoped_lock lock(update_state_mutex_);
bundle_ = bundle;
video_enabled_ = video_enabled;
audio_enabled_ = audioEnabled;
Expand Down Expand Up @@ -147,10 +145,10 @@ bool WebRtcConnection::createOffer(bool video_enabled, bool audioEnabled, bool b
audio_transport_->start();
}
}
if (conn_event_listener_ != nullptr) {
std::string msg = this->getLocalSdp();
conn_event_listener_->notifyEvent(global_state_, msg);
}

std::string msg = this->getLocalSdp();
maybeNotifyWebRtcConnectionEvent(global_state_, msg);

std::weak_ptr<WebRtcConnection> weak_this = shared_from_this();
forEachMediaStreamAsync([weak_this] (const std::shared_ptr<MediaStream> &media_stream) {
if (auto connection = weak_this.lock()) {
Expand All @@ -169,7 +167,7 @@ void WebRtcConnection::addMediaStream(std::shared_ptr<MediaStream> media_stream)

void WebRtcConnection::removeMediaStream(const std::string& stream_id) {
asyncTask([stream_id] (std::shared_ptr<WebRtcConnection> connection) {
boost::mutex::scoped_lock lock(connection->updateStateMutex_);
boost::mutex::scoped_lock lock(connection->update_state_mutex_);
ELOG_DEBUG("%s message: removing mediaStream, id: %s", connection->toLog(), stream_id.c_str());
connection->media_streams_.erase(std::remove_if(connection->media_streams_.begin(),
connection->media_streams_.end(),
Expand Down Expand Up @@ -218,7 +216,7 @@ bool WebRtcConnection::setRemoteSdpInfo(std::shared_ptr<SdpInfo> sdp, std::strin
}

std::shared_ptr<SdpInfo> WebRtcConnection::getLocalSdpInfo() {
boost::mutex::scoped_lock lock(updateStateMutex_);
boost::mutex::scoped_lock lock(update_state_mutex_);
ELOG_DEBUG("%s message: getting local SDPInfo", toLog());
forEachMediaStream([this] (const std::shared_ptr<MediaStream> &media_stream) {
if (!media_stream->isRunning() || media_stream->isPublisher()) {
Expand Down Expand Up @@ -287,7 +285,7 @@ void WebRtcConnection::onRemoteSdpsSetToMediaStreams(std::string stream_id) {
asyncTask([stream_id] (std::shared_ptr<WebRtcConnection> connection) {
ELOG_DEBUG("%s message: SDP processed", connection->toLog());
std::string sdp = connection->getLocalSdp();
connection->conn_event_listener_->notifyEvent(CONN_SDP_PROCESSED, sdp, stream_id);
connection->maybeNotifyWebRtcConnectionEvent(CONN_SDP_PROCESSED, sdp, stream_id);
});
}

Expand Down Expand Up @@ -453,19 +451,17 @@ void WebRtcConnection::onCandidate(const CandidateInfo& cand, Transport *transpo
std::string sdp = local_sdp_->addCandidate(cand);
ELOG_DEBUG("%s message: Discovered New Candidate, candidate: %s", toLog(), sdp.c_str());
if (trickle_enabled_) {
if (conn_event_listener_ != nullptr) {
if (!bundle_) {
std::string object = this->getJSONCandidate(transport->transport_name, sdp);
conn_event_listener_->notifyEvent(CONN_CANDIDATE, object);
} else {
if (remote_sdp_->hasAudio) {
std::string object = this->getJSONCandidate("audio", sdp);
conn_event_listener_->notifyEvent(CONN_CANDIDATE, object);
}
if (remote_sdp_->hasVideo) {
std::string object2 = this->getJSONCandidate("video", sdp);
conn_event_listener_->notifyEvent(CONN_CANDIDATE, object2);
}
if (!bundle_) {
std::string object = this->getJSONCandidate(transport->transport_name, sdp);
maybeNotifyWebRtcConnectionEvent(CONN_CANDIDATE, object);
} else {
if (remote_sdp_->hasAudio) {
std::string object = this->getJSONCandidate("audio", sdp);
maybeNotifyWebRtcConnectionEvent(CONN_CANDIDATE, object);
}
if (remote_sdp_->hasVideo) {
std::string object2 = this->getJSONCandidate("video", sdp);
maybeNotifyWebRtcConnectionEvent(CONN_CANDIDATE, object2);
}
}
}
Expand Down Expand Up @@ -540,6 +536,15 @@ void WebRtcConnection::onTransportData(std::shared_ptr<DataPacket> packet, Trans
}
}

void WebRtcConnection::maybeNotifyWebRtcConnectionEvent(const WebRTCEvent& event, const std::string& message,
const std::string& stream_id) {
boost::mutex::scoped_lock lock(event_listener_mutex_);
if (!conn_event_listener_) {
return;
}
conn_event_listener_->notifyEvent(event, message, stream_id);
}

void WebRtcConnection::asyncTask(std::function<void(std::shared_ptr<WebRtcConnection>)> f) {
std::weak_ptr<WebRtcConnection> weak_this = shared_from_this();
worker_->task([weak_this, f] {
Expand All @@ -550,7 +555,7 @@ void WebRtcConnection::asyncTask(std::function<void(std::shared_ptr<WebRtcConnec
}

void WebRtcConnection::updateState(TransportState state, Transport * transport) {
boost::mutex::scoped_lock lock(updateStateMutex_);
boost::mutex::scoped_lock lock(update_state_mutex_);
WebRTCEvent temp = global_state_;
std::string msg = "";
ELOG_DEBUG("%s transportName: %s, new_state: %d", toLog(), transport->transport_name.c_str(), state);
Expand Down Expand Up @@ -655,10 +660,8 @@ void WebRtcConnection::updateState(TransportState state, Transport * transport)

global_state_ = temp;

if (conn_event_listener_ != nullptr) {
ELOG_INFO("%s newGlobalState: %d", toLog(), global_state_);
conn_event_listener_->notifyEvent(global_state_, msg);
}
ELOG_INFO("%s newGlobalState: %d", toLog(), global_state_);
maybeNotifyWebRtcConnectionEvent(global_state_, msg);
}

void WebRtcConnection::trackTransportInfo() {
Expand Down Expand Up @@ -687,6 +690,11 @@ void WebRtcConnection::setMetadata(std::map<std::string, std::string> metadata)
setLogContext(metadata);
}

void WebRtcConnection::setWebRtcConnectionEventListener(WebRtcConnectionEventListener* listener) {
boost::mutex::scoped_lock lock(event_listener_mutex_);
this->conn_event_listener_ = listener;
}

WebRTCEvent WebRtcConnection::getCurrentState() {
return global_state_;
}
Expand Down
10 changes: 5 additions & 5 deletions erizo/src/erizo/WebRtcConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,7 @@ class WebRtcConnection: public TransportListener, public LogContext,
/**
* Sets the Event Listener for this WebRtcConnection
*/
inline void setWebRtcConnectionEventListener(WebRtcConnectionEventListener* listener) {
this->conn_event_listener_ = listener;
}

void setWebRtcConnectionEventListener(WebRtcConnectionEventListener* listener);

/**
* Gets the current state of the Ice Connection
Expand Down Expand Up @@ -162,6 +159,8 @@ class WebRtcConnection: public TransportListener, public LogContext,
void trackTransportInfo();
void onRtcpFromTransport(std::shared_ptr<DataPacket> packet, Transport *transport);
void onREMBFromTransport(RtcpHeader *chead, Transport *transport);
void maybeNotifyWebRtcConnectionEvent(const WebRTCEvent& event, const std::string& message,
const std::string& stream_id = "");

private:
std::string connection_id_;
Expand All @@ -182,7 +181,8 @@ class WebRtcConnection: public TransportListener, public LogContext,
std::shared_ptr<Stats> stats_;
WebRTCEvent global_state_;

boost::mutex updateStateMutex_; // , slideShowMutex_;
boost::mutex update_state_mutex_;
boost::mutex event_listener_mutex_;

std::shared_ptr<Worker> worker_;
std::shared_ptr<IOWorker> io_worker_;
Expand Down
Loading