Skip to content

Commit

Permalink
Refs #3760. Fixing semaphore on stateful reader. (#323)
Browse files Browse the repository at this point in the history
  • Loading branch information
MiguelCompany authored and richiware committed Nov 15, 2018
1 parent efaef42 commit 1cd7435
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 47 deletions.
4 changes: 3 additions & 1 deletion include/fastrtps/rtps/reader/StatefulReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,9 @@ class StatefulReader:public RTPSReader
/*!
* @remarks Nn thread-safe.
*/
bool findWriterProxy(const GUID_t& writerGUID, WriterProxy** WP);
bool findWriterProxy(const GUID_t& writerGUID, WriterProxy** wp);

void NotifyChanges(WriterProxy* wp);

//!ReaderTimes of the StatefulReader.
ReaderTimes m_times;
Expand Down
75 changes: 29 additions & 46 deletions src/cpp/rtps/reader/StatefulReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -389,30 +389,7 @@ bool StatefulReader::processHeartbeatMsg(GUID_t &writerGUID, uint32_t hbCount, S
wpLock.unlock();

// Maybe now we have to notify user from new CacheChanges.
SequenceNumber_t last_notified = update_last_notified(proxGUID, pWP->available_changes_max());
SequenceNumber_t nextChangeToNotify = pWP->nextCacheChangeToBeNotified();
while(nextChangeToNotify != SequenceNumber_t::unknown())
{
if( (getListener()!=nullptr) && (nextChangeToNotify > last_notified) )
{
mp_history->postSemaphore();

CacheChange_t* ch_to_give = nullptr;
if(mp_history->get_change(nextChangeToNotify, proxGUID, &ch_to_give))
{
if(!ch_to_give->isRead)
{
getListener()->onNewCacheChangeAdded((RTPSReader*)this,ch_to_give);
}
}

// Search again the WriterProxy because could be removed after the unlock.
if(!findWriterProxy(proxGUID, &pWP))
break;
}

nextChangeToNotify = pWP->nextCacheChangeToBeNotified();
}
NotifyChanges(pWP);
}
}

Expand Down Expand Up @@ -511,39 +488,45 @@ bool StatefulReader::change_received(CacheChange_t* a_change, WriterProxy* prox)

writerProxyLock.unlock();

SequenceNumber_t last_notified = update_last_notified(proxGUID, prox->available_changes_max());
SequenceNumber_t nextChangeToNotify = prox->nextCacheChangeToBeNotified();
while(nextChangeToNotify != SequenceNumber_t::unknown())
NotifyChanges(prox);

return ret;
}

return false;
}

void StatefulReader::NotifyChanges(WriterProxy* prox)
{
GUID_t proxGUID = prox->m_att.guid;
SequenceNumber_t last_notified = update_last_notified(proxGUID, prox->available_changes_max());
SequenceNumber_t nextChangeToNotify = prox->nextCacheChangeToBeNotified();
while (nextChangeToNotify != SequenceNumber_t::unknown())
{
if (nextChangeToNotify > last_notified)
{
if (nextChangeToNotify > last_notified)
mp_history->postSemaphore();

if (getListener() != nullptr)
{
mp_history->postSemaphore();
CacheChange_t* ch_to_give = nullptr;

if (getListener() != nullptr)
if (mp_history->get_change(nextChangeToNotify, proxGUID, &ch_to_give))
{
CacheChange_t* ch_to_give = nullptr;

if (mp_history->get_change(nextChangeToNotify, proxGUID, &ch_to_give))
if (!ch_to_give->isRead)
{
if (!ch_to_give->isRead)
{
getListener()->onNewCacheChangeAdded((RTPSReader*)this, ch_to_give);
}
getListener()->onNewCacheChangeAdded((RTPSReader*)this, ch_to_give);
}

// Search again the WriterProxy because could be removed after the unlock.
if (!findWriterProxy(proxGUID, &prox))
break;
}
}

nextChangeToNotify = prox->nextCacheChangeToBeNotified();
// Search again the WriterProxy because could be removed after the unlock.
if (!findWriterProxy(proxGUID, &prox))
break;
}
}

return ret;
nextChangeToNotify = prox->nextCacheChangeToBeNotified();
}

return false;
}

bool StatefulReader::nextUntakenCache(CacheChange_t** change,WriterProxy** wpout)
Expand Down

0 comments on commit 1cd7435

Please sign in to comment.