Skip to content

Commit

Permalink
Fixes #32.
Browse files Browse the repository at this point in the history
  • Loading branch information
richiprosima committed Mar 31, 2016
2 parents 9c652cd + 2427145 commit 5209ca0
Show file tree
Hide file tree
Showing 9 changed files with 29 additions and 29 deletions.
2 changes: 1 addition & 1 deletion include/fastrtps/rtps/common/CacheChange.h
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ namespace eprosima

struct ChangeForReaderCmp
{
bool operator()(const ChangeForReader_t& a, const ChangeForReader_t& b)
bool operator()(const ChangeForReader_t& a, const ChangeForReader_t& b) const
{
return a.seq_num_ < b.seq_num_;
}
Expand Down
3 changes: 3 additions & 0 deletions src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,10 @@ void PDPSimple::assertRemoteParticipantLiveliness(const GuidPrefix_t& guidP)
// TODO Ricardo: Study if isAlive attribute is necessary.
(*it)->isAlive = true;
if((*it)->mp_leaseDurationTimer != nullptr)
{
(*it)->mp_leaseDurationTimer->cancel_timer();
(*it)->mp_leaseDurationTimer->restart_timer();
}
break;
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/rtps/reader/StatefulReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ bool StatefulReader::processDataMsg(CacheChange_t *change)

if(pWP == nullptr && getGuid().entityId == c_EntityId_SPDPReader)
{
mp_RTPSParticipant->assertRemoteRTPSParticipantLiveliness(change_to_add->writerGUID.guidPrefix);
mp_RTPSParticipant->assertRemoteRTPSParticipantLiveliness(change->writerGUID.guidPrefix);
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/cpp/rtps/reader/StatelessReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,10 @@ bool StatelessReader::processDataMsg(CacheChange_t *change)
logInfo(RTPS_MSG_IN,IDSTRING"MessageReceiver not add change "
<<change_to_add->sequenceNumber, C_BLUE);
releaseCache(change_to_add);

if(getGuid().entityId == c_EntityId_SPDPReader)
{
mp_RTPSParticipant->assertRemoteRTPSParticipantLiveliness(change_to_add->writerGUID.guidPrefix);
mp_RTPSParticipant->assertRemoteRTPSParticipantLiveliness(change->writerGUID.guidPrefix);
}
}
}
Expand Down
19 changes: 5 additions & 14 deletions src/cpp/rtps/writer/ReaderProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,22 +83,12 @@ bool ReaderProxy::getChangeForReader(const SequenceNumber_t& seqNum, ChangeForRe
bool ReaderProxy::acked_changes_set(const SequenceNumber_t& seqNum)
{
boost::lock_guard<boost::recursive_mutex> guard(*mp_mutex);
auto chit = m_changesForReader.find(ChangeForReader_t(seqNum));

auto chit = m_changesForReader.begin();

if(chit != m_changesForReader.end())
while(chit != m_changesForReader.end() && chit->getSequenceNumber() < seqNum)
{
if(chit == m_changesForReader.begin())
{ // If first, remove and cleanup.
m_changesForReader.erase(chit);
cleanup();
}
else
{
ChangeForReader_t newch(*chit);
newch.setStatus(ACKNOWLEDGED);
m_changesForReader.erase(chit);
m_changesForReader.insert(newch);
}
chit = m_changesForReader.erase(chit);
}

return false;
Expand Down Expand Up @@ -213,6 +203,7 @@ void ReaderProxy::underway_changes_to_unacknowledged()
}
}

// TODO Study if cleanup is necessary
void ReaderProxy::underway_changes_to_acknowledged()
{
boost::lock_guard<boost::recursive_mutex> guard(*mp_mutex);
Expand Down
4 changes: 0 additions & 4 deletions src/cpp/rtps/writer/StatefulWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,11 @@ void StatefulWriter::unsent_change_added_to_history(CacheChange_t* change)
else
changeForReader.setStatus(UNACKNOWLEDGED);

// Block access to ReaderProxy
(*it)->mp_mutex->lock();
changeForReader.setRelevance((*it)->rtps_is_relevant(change));
(*it)->m_changesForReader.insert(changeForReader);
unilocList.push_back((*it)->m_att.endpoint.unicastLocatorList);
multilocList.push_back((*it)->m_att.endpoint.multicastLocatorList);
expectsInlineQos |= (*it)->m_att.expectsInlineQos;
// Release access before restart the timer.
(*it)->mp_mutex->unlock();

(*it)->mp_nackSupression->restart_timer();
}
Expand Down
3 changes: 3 additions & 0 deletions src/cpp/rtps/writer/timedevent/NackResponseDelay.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/

#include <fastrtps/rtps/writer/timedevent/NackResponseDelay.h>
#include <fastrtps/rtps/writer/timedevent/NackSupressionDuration.h>
#include <fastrtps/rtps/resources/ResourceEvent.h>

#include <fastrtps/rtps/writer/StatefulWriter.h>
Expand Down Expand Up @@ -107,6 +108,8 @@ void NackResponseDelay::event(EventCode code, const char* msg)
mp_RP->mp_SFW->getRTPSParticipant()->sendSync(&m_cdrmessages.m_rtpsmsg_fullmsg, (*lit));
}
}
else
mp_RP->mp_nackSupression->restart_timer();
}
else if(code == EVENT_ABORT)
{
Expand Down
20 changes: 13 additions & 7 deletions test/blackbox/BlackboxTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,13 +327,19 @@ TEST(BlackBox, PubSubAsReliableData64kb)

// Send some data.
std::list<uint16_t> msgs = reader.getNonReceivedMessages();
writer.send(msgs);

// Destroy the writer participant.
writer.destroy();
writer.send(msgs);
reader.block(*msgs.rbegin(), std::chrono::seconds(60));

// Check that reader receives the unmatched.
reader.waitRemoval();
msgs = reader.getNonReceivedMessages();
if(msgs.size() != 0)
{
std::cout << "Samples not received:";
for(std::list<uint16_t>::iterator it = msgs.begin(); it != msgs.end(); ++it)
std::cout << " " << *it << " ";
std::cout << std::endl;
}
ASSERT_EQ(msgs.size(), 0);
}

// Test created to check bug #1555 (Github #31)
Expand All @@ -359,8 +365,8 @@ TEST(BlackBox, PubSubKeepLast)
std::list<uint16_t> msgs = reader.getNonReceivedMessages();

writer.send(msgs);
std::this_thread::sleep_for(std::chrono::seconds(1));
reader.read(*msgs.rbegin(), std::chrono::seconds(60));
std::this_thread::sleep_for(std::chrono::seconds(5));
reader.read(*msgs.rbegin(), std::chrono::seconds(120));

msgs = reader.getNonReceivedMessages();
if(msgs.size() != 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ TEST(TimedEvent, EventNonAutoDestruc_AutoRestartAndDeleteRandomly)
// Restart destriction counter.
MockEvent::destructed_ = 0;

boost::mt19937 rng(std::time(0));
boost::mt19937 rng(static_cast<uint32_t>(std::time(nullptr)));
boost::uniform_int<> range(10, 100);
boost::variate_generator<boost::mt19937, boost::uniform_int<>> random(rng, range);

Expand Down

0 comments on commit 5209ca0

Please sign in to comment.