Skip to content

Commit

Permalink
User WeakPtr instead of raw pointers in MediaDef (#1558)
Browse files Browse the repository at this point in the history
  • Loading branch information
jcague authored Mar 23, 2020
1 parent 2f2ec34 commit a2df683
Show file tree
Hide file tree
Showing 20 changed files with 171 additions and 155 deletions.
58 changes: 29 additions & 29 deletions erizo/src/erizo/MediaDefinitions.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,21 +102,21 @@ class FeedbackSink {
public:
virtual ~FeedbackSink() {}
int deliverFeedback(std::shared_ptr<DataPacket> data_packet) {
return this->deliverFeedback_(data_packet);
return deliverFeedback_(data_packet);
}
private:
virtual int deliverFeedback_(std::shared_ptr<DataPacket> data_packet) = 0;
};

class FeedbackSource {
protected:
FeedbackSink* fb_sink_;
std::weak_ptr<FeedbackSink> fb_sink_;
public:
FeedbackSource(): fb_sink_{nullptr} {}
virtual ~FeedbackSource() {}
void setFeedbackSink(FeedbackSink* sink) {
fb_sink_ = sink;
}
FeedbackSource() : fb_sink_{} {}
virtual ~FeedbackSource() {}
void setFeedbackSink(std::weak_ptr<FeedbackSink> sink) {
fb_sink_ = sink;
}
};

/*
Expand All @@ -128,14 +128,14 @@ class MediaSink: public virtual Monitor {
uint32_t audio_sink_ssrc_;
uint32_t video_sink_ssrc_;
// Is it able to provide Feedback
FeedbackSource* sink_fb_source_;
std::weak_ptr<FeedbackSource> sink_fb_source_;

public:
int deliverAudioData(std::shared_ptr<DataPacket> data_packet) {
return this->deliverAudioData_(data_packet);
return deliverAudioData_(data_packet);
}
int deliverVideoData(std::shared_ptr<DataPacket> data_packet) {
return this->deliverVideoData_(data_packet);
return deliverVideoData_(data_packet);
}
uint32_t getVideoSinkSSRC() {
boost::mutex::scoped_lock lock(monitor_mutex_);
Expand All @@ -159,14 +159,14 @@ class MediaSink: public virtual Monitor {
bool isAudioSinkSSRC(uint32_t ssrc) {
return ssrc == audio_sink_ssrc_;
}
FeedbackSource* getFeedbackSource() {
std::weak_ptr<FeedbackSource> getFeedbackSource() {
boost::mutex::scoped_lock lock(monitor_mutex_);
return sink_fb_source_;
}
int deliverEvent(MediaEventPtr event) {
return this->deliverEvent_(event);
return deliverEvent_(event);
}
MediaSink() : audio_sink_ssrc_{0}, video_sink_ssrc_{0}, sink_fb_source_{nullptr} {}
MediaSink() : audio_sink_ssrc_{0}, video_sink_ssrc_{0}, sink_fb_source_{} {}
virtual ~MediaSink() {}

virtual boost::future<void> close() = 0;
Expand All @@ -185,29 +185,29 @@ class MediaSource: public virtual Monitor {
// SSRCs coming from the source
uint32_t audio_source_ssrc_;
std::vector<uint32_t> video_source_ssrc_list_;
MediaSink* video_sink_;
MediaSink* audio_sink_;
MediaSink* event_sink_;
std::weak_ptr<MediaSink> video_sink_;
std::weak_ptr<MediaSink> audio_sink_;
std::weak_ptr<MediaSink> event_sink_;
// can it accept feedback
FeedbackSink* source_fb_sink_;
std::weak_ptr<FeedbackSink> source_fb_sink_;

public:
void setAudioSink(MediaSink* audio_sink) {
boost::mutex::scoped_lock lock(monitor_mutex_);
this->audio_sink_ = audio_sink;
void setAudioSink(std::weak_ptr<MediaSink> audio_sink) {
boost::mutex::scoped_lock lock(monitor_mutex_);
audio_sink_ = audio_sink;
}
void setVideoSink(MediaSink* video_sink) {
boost::mutex::scoped_lock lock(monitor_mutex_);
this->video_sink_ = video_sink;
void setVideoSink(std::weak_ptr<MediaSink> video_sink) {
boost::mutex::scoped_lock lock(monitor_mutex_);
video_sink_ = video_sink;
}
void setEventSink(MediaSink* event_sink) {
void setEventSink(std::weak_ptr<MediaSink> event_sink) {
boost::mutex::scoped_lock lock(monitor_mutex_);
this->event_sink_ = event_sink;
event_sink_ = event_sink;
}

FeedbackSink* getFeedbackSink() {
boost::mutex::scoped_lock lock(monitor_mutex_);
return source_fb_sink_;
std::weak_ptr<FeedbackSink> getFeedbackSink() {
boost::mutex::scoped_lock lock(monitor_mutex_);
return source_fb_sink_;
}
virtual int sendPLI() = 0;
uint32_t getVideoSourceSSRC() {
Expand Down Expand Up @@ -255,7 +255,7 @@ class MediaSource: public virtual Monitor {
}

MediaSource() : audio_source_ssrc_{0}, video_source_ssrc_list_{std::vector<uint32_t>(1, 0)},
video_sink_{nullptr}, audio_sink_{nullptr}, event_sink_{nullptr}, source_fb_sink_{nullptr} {}
video_sink_{}, audio_sink_{}, event_sink_{}, source_fb_sink_{} {}
virtual ~MediaSource() {}

virtual boost::future<void> close() = 0;
Expand Down
58 changes: 34 additions & 24 deletions erizo/src/erizo/MediaStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,6 @@ MediaStream::MediaStream(std::shared_ptr<Worker> worker,
}
ELOG_INFO("%s message: constructor, id: %s",
toLog(), media_stream_id.c_str());
source_fb_sink_ = this;
sink_fb_source_ = this;
stats_ = std::make_shared<Stats>();
log_stats_ = std::make_shared<Stats>();
quality_manager_ = std::make_shared<QualityManager>();
Expand Down Expand Up @@ -134,9 +132,9 @@ void MediaStream::syncClose() {
}
sending_ = false;
ready_ = false;
video_sink_ = nullptr;
audio_sink_ = nullptr;
fb_sink_ = nullptr;
video_sink_.reset();
audio_sink_.reset();
fb_sink_.reset();
pipeline_initialized_ = false;
pipeline_->close();
pipeline_.reset();
Expand All @@ -152,7 +150,16 @@ boost::future<void> MediaStream::close() {
});
}

bool MediaStream::init(bool doNotWaitForRemoteSdp) {
void MediaStream::init() {
if (source_fb_sink_.expired()) {
source_fb_sink_ = std::dynamic_pointer_cast<FeedbackSink>(shared_from_this());
}
if (sink_fb_source_.expired()) {
sink_fb_source_ = std::dynamic_pointer_cast<FeedbackSource>(shared_from_this());
}
}

bool MediaStream::configure(bool doNotWaitForRemoteSdp) {
if (doNotWaitForRemoteSdp) {
ready_ = true;
}
Expand Down Expand Up @@ -210,7 +217,6 @@ bool MediaStream::setRemoteSdp(std::shared_ptr<SdpInfo> sdp, int session_version
pipeline_->notifyUpdate();
return true;
}

bundle_ = remote_sdp_->isBundle;
if (video_ssrc_list_it != remote_sdp_->video_ssrc_map.end()) {
setVideoSourceSSRCList(video_ssrc_list_it->second);
Expand Down Expand Up @@ -493,7 +499,7 @@ int MediaStream::deliverEvent_(MediaEventPtr event) {
}

void MediaStream::onTransportData(std::shared_ptr<DataPacket> incoming_packet, Transport *transport) {
if ((audio_sink_ == nullptr && video_sink_ == nullptr && fb_sink_ == nullptr)) {
if (audio_sink_.expired() && video_sink_.expired() && fb_sink_.expired()) {
return;
}

Expand Down Expand Up @@ -537,53 +543,56 @@ void MediaStream::read(std::shared_ptr<DataPacket> packet) {
RtpHeader *head = reinterpret_cast<RtpHeader*> (buf);
RtcpHeader *chead = reinterpret_cast<RtcpHeader*> (buf);
uint32_t recvSSRC = 0;
auto video_sink = video_sink_.lock();
auto audio_sink = audio_sink_.lock();
if (!chead->isRtcp()) {
recvSSRC = head->getSSRC();
} else if (chead->packettype == RTCP_Sender_PT || chead->packettype == RTCP_SDES_PT) { // Sender Report
recvSSRC = chead->getSSRC();
}
// DELIVER FEEDBACK (RR, FEEDBACK PACKETS)
if (chead->isFeedback()) {
if (fb_sink_ != nullptr && should_send_feedback_) {
fb_sink_->deliverFeedback(std::move(packet));
auto fb_sink = fb_sink_.lock();
if (should_send_feedback_ && fb_sink) {
fb_sink->deliverFeedback(std::move(packet));
}
} else {
// RTP or RTCP Sender Report
if (bundle_) {
// Check incoming SSRC
// Deliver data
if (isVideoSourceSSRC(recvSSRC) && video_sink_) {
if (isVideoSourceSSRC(recvSSRC) && video_sink) {
parseIncomingPayloadType(buf, len, VIDEO_PACKET);
parseIncomingExtensionId(buf, len, VIDEO_PACKET);
video_sink_->deliverVideoData(std::move(packet));
} else if (isAudioSourceSSRC(recvSSRC) && audio_sink_) {
video_sink->deliverVideoData(std::move(packet));
} else if (isAudioSourceSSRC(recvSSRC) && audio_sink) {
parseIncomingPayloadType(buf, len, AUDIO_PACKET);
parseIncomingExtensionId(buf, len, AUDIO_PACKET);
audio_sink_->deliverAudioData(std::move(packet));
audio_sink->deliverAudioData(std::move(packet));
} else {
ELOG_DEBUG("%s read video unknownSSRC: %u, localVideoSSRC: %u, localAudioSSRC: %u",
toLog(), recvSSRC, this->getVideoSourceSSRC(), this->getAudioSourceSSRC());
}
} else {
if (packet->type == AUDIO_PACKET && audio_sink_) {
if (packet->type == AUDIO_PACKET && audio_sink) {
parseIncomingPayloadType(buf, len, AUDIO_PACKET);
parseIncomingExtensionId(buf, len, AUDIO_PACKET);
// Firefox does not send SSRC in SDP
if (getAudioSourceSSRC() == 0) {
ELOG_DEBUG("%s discoveredAudioSourceSSRC:%u", toLog(), recvSSRC);
this->setAudioSourceSSRC(recvSSRC);
setAudioSourceSSRC(recvSSRC);
}
audio_sink_->deliverAudioData(std::move(packet));
} else if (packet->type == VIDEO_PACKET && video_sink_) {
audio_sink->deliverAudioData(std::move(packet));
} else if (packet->type == VIDEO_PACKET && video_sink) {
parseIncomingPayloadType(buf, len, VIDEO_PACKET);
parseIncomingExtensionId(buf, len, VIDEO_PACKET);
// Firefox does not send SSRC in SDP
if (getVideoSourceSSRC() == 0) {
ELOG_DEBUG("%s discoveredVideoSourceSSRC:%u", toLog(), recvSSRC);
this->setVideoSourceSSRC(recvSSRC);
setVideoSourceSSRC(recvSSRC);
}
// change ssrc for RTP packets, don't touch here if RTCP
video_sink_->deliverVideoData(std::move(packet));
video_sink->deliverVideoData(std::move(packet));
}
} // if not bundle
} // if not Feedback
Expand All @@ -602,7 +611,9 @@ void MediaStream::notifyMediaStreamEvent(const std::string& type, const std::str
}

void MediaStream::notifyToEventSink(MediaEventPtr event) {
event_sink_->deliverEvent(std::move(event));
if (auto event_sink = event_sink_.lock()) {
event_sink->deliverEvent(std::move(event));
}
}

int MediaStream::sendPLI() {
Expand All @@ -619,9 +630,8 @@ int MediaStream::sendPLI() {
}

void MediaStream::sendPLIToFeedback() {
if (fb_sink_) {
fb_sink_->deliverFeedback(RtpUtils::createPLI(this->getVideoSinkSSRC(),
this->getVideoSourceSSRC()));
if (auto fb_sink = fb_sink_.lock()) {
fb_sink->deliverFeedback(RtpUtils::createPLI(getVideoSinkSSRC(), getVideoSourceSSRC()));
}
}

Expand Down
3 changes: 2 additions & 1 deletion erizo/src/erizo/MediaStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ class MediaStream: public MediaSink, public MediaSource, public FeedbackSink,
* Destructor.
*/
virtual ~MediaStream();
bool init(bool doNotWaitForRemoteSdp);
void init();
bool configure(bool doNotWaitForRemoteSdp);
boost::future<void> close() override;
virtual uint32_t getMaxVideoBW();
virtual uint32_t getBitrateFromMaxQualityLayer() { return bitrate_from_max_quality_layer_; }
Expand Down
21 changes: 11 additions & 10 deletions erizo/src/erizo/OneToManyProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

namespace erizo {
DEFINE_LOGGER(OneToManyProcessor, "OneToManyProcessor");
OneToManyProcessor::OneToManyProcessor() : feedback_sink_{nullptr} {
OneToManyProcessor::OneToManyProcessor() : feedback_sink_{} {
ELOG_DEBUG("OneToManyProcessor constructor");
}

Expand Down Expand Up @@ -108,7 +108,7 @@ namespace erizo {
}

int OneToManyProcessor::deliverFeedback_(std::shared_ptr<DataPacket> fb_packet) {
if (feedback_sink_ != nullptr) {
if (auto feedback_sink = feedback_sink_.lock()) {
RtpUtils::forEachRtcpBlock(fb_packet, [this](RtcpHeader *chead) {
if (chead->isREMB()) {
for (uint8_t index = 0; index < chead->getREMBNumSSRC(); index++) {
Expand All @@ -125,7 +125,7 @@ namespace erizo {
chead->setSourceSSRC(publisher_->getVideoSourceSSRC());
}
});
feedback_sink_->deliverFeedback(fb_packet);
feedback_sink->deliverFeedback(fb_packet);
}
return 0;
}
Expand All @@ -152,11 +152,12 @@ namespace erizo {
ELOG_DEBUG("Subscribers ssrcs: Audio %u, video, %u from %u, %u ",
subscriber_stream->getAudioSinkSSRC(), subscriber_stream->getVideoSinkSSRC(),
publisher_->getAudioSourceSSRC() , publisher_->getVideoSourceSSRC());
FeedbackSource* fbsource = subscriber_stream->getFeedbackSource();
std::shared_ptr<FeedbackSource> fbsource = subscriber_stream->getFeedbackSource().lock();

if (fbsource != nullptr) {
if (fbsource) {
ELOG_DEBUG("adding fbsource");
fbsource->setFeedbackSink(this);
auto fbsink = std::dynamic_pointer_cast<FeedbackSink>(shared_from_this());
fbsource->setFeedbackSink(fbsink);
}
if (subscribers_.find(peer_id) != subscribers_.end()) {
ELOG_WARN("This OTM already has a subscriber with peer_id %s, substituting it", peer_id.c_str());
Expand Down Expand Up @@ -189,15 +190,15 @@ namespace erizo {
ELOG_DEBUG("OneToManyProcessor closeAll");
std::shared_ptr<boost::promise<void>> p = std::make_shared<boost::promise<void>>();
boost::future<void> f = p->get_future();
feedback_sink_ = nullptr;
feedback_sink_.reset();
publisher_.reset();
boost::unique_lock<boost::mutex> lock(monitor_mutex_);
std::map<std::string, std::shared_ptr<MediaSink>>::iterator it = subscribers_.begin();
while (it != subscribers_.end()) {
if ((*it).second != nullptr) {
FeedbackSource* fbsource = (*it).second->getFeedbackSource();
if (fbsource != nullptr) {
fbsource->setFeedbackSink(nullptr);
std::shared_ptr<FeedbackSource> fbsource = (*it).second->getFeedbackSource().lock();
if (fbsource) {
fbsource->setFeedbackSink(std::shared_ptr<FeedbackSink>());
}
}
subscribers_.erase(it++);
Expand Down
5 changes: 3 additions & 2 deletions erizo/src/erizo/OneToManyProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ class MediaStream;
* Represents a One to Many connection.
* Receives media from one publisher and retransmits it to every subscriber.
*/
class OneToManyProcessor : public MediaSink, public FeedbackSink {
class OneToManyProcessor
: public MediaSink, public FeedbackSink, public std::enable_shared_from_this<OneToManyProcessor> {
DECLARE_LOGGER();

public:
Expand Down Expand Up @@ -61,7 +62,7 @@ class OneToManyProcessor : public MediaSink, public FeedbackSink {
uint32_t translateAndMaybeAdaptForSimulcast(uint32_t orig_ssrc);

private:
FeedbackSink* feedback_sink_;
std::weak_ptr<FeedbackSink> feedback_sink_;
std::map<std::string, std::shared_ptr<MediaSink>> subscribers_;
std::shared_ptr<MediaSource> publisher_;
std::string publisher_id_;
Expand Down
8 changes: 5 additions & 3 deletions erizo/src/erizo/media/ExternalInput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ int ExternalInput::deliverFeedback_(std::shared_ptr<DataPacket> fb_packet) {
}

void ExternalInput::receiveRtpData(unsigned char* rtpdata, int len) {
if (video_sink_ != nullptr) {
if (auto video_sink = video_sink_.lock()) {
RtcpHeader* head = reinterpret_cast<RtcpHeader*>(rtpdata);
if (!head->isRtcp()) {
if (getVideoSourceSSRC() == 0) {
Expand All @@ -228,7 +228,7 @@ void ExternalInput::receiveRtpData(unsigned char* rtpdata, int len) {
} else {
packet->is_keyframe = false;
}
video_sink_->deliverVideoData(packet);
video_sink->deliverVideoData(packet);
}
}

Expand Down Expand Up @@ -276,7 +276,9 @@ void ExternalInput::receiveLoop() {
if (length > 0) {
std::shared_ptr<DataPacket> packet = std::make_shared<DataPacket>(0,
reinterpret_cast<char*>(decodedBuffer_.get()), length, AUDIO_PACKET);
audio_sink_->deliverAudioData(packet);
if (auto audio_sink = audio_sink_.lock()) {
audio_sink->deliverAudioData(packet);
}
}
}
}
Expand Down
Loading

0 comments on commit a2df683

Please sign in to comment.