Skip to content

Commit

Permalink
Refs #2635. Sending n, n-1 heartbeat when history is empty (#330)
Browse files Browse the repository at this point in the history
  • Loading branch information
MiguelCompany authored and richiware committed Nov 20, 2018
1 parent 2fa445e commit b3716ce
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 14 deletions.
5 changes: 2 additions & 3 deletions include/fastrtps/rtps/writer/StatefulWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,16 +184,15 @@ namespace eprosima
* @brief Sends a heartbeat to a remote reader.
* @remarks This function is non thread-safe.
*/
void send_heartbeat_to_nts(ReaderProxy& remoteReaderProxy, bool final = false,
bool send_empty_history_info = false);
void send_heartbeat_to_nts(ReaderProxy& remoteReaderProxy, bool final = false);

void process_acknack(const GUID_t reader_guid, uint32_t ack_count,
const SequenceNumberSet_t& sn_set, bool final_flag);

private:

void send_heartbeat_nts_(const std::vector<GUID_t>& remote_readers, const LocatorList_t& locators,
RTPSMessageGroup& message_group, bool final = false, bool send_empty_history_info = false);
RTPSMessageGroup& message_group, bool final = false);

void check_acked_status();

Expand Down
2 changes: 1 addition & 1 deletion src/cpp/rtps/messages/MessageReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,7 @@ bool MessageReceiver::proc_Submsg_Heartbeat(CDRMessage_t* msg,SubmessageHeader_t
SequenceNumber_t firstSN, lastSN;
CDRMessage::readSequenceNumber(msg,&firstSN);
CDRMessage::readSequenceNumber(msg,&lastSN);
if(lastSN < firstSN && lastSN != SequenceNumber_t(0, 0))
if(lastSN < firstSN && lastSN != firstSN-1)
{
logWarning(RTPS_MSG_IN, IDSTRING"Invalid Heartbeat received (" << firstSN << ") - (" <<
lastSN << "), ignoring");
Expand Down
18 changes: 8 additions & 10 deletions src/cpp/rtps/writer/StatefulWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -513,14 +513,14 @@ bool StatefulWriter::matched_reader_add(RemoteReaderAttributes& rdata)
// The state has to be updated.
this->mp_periodicHB->restart_timer();
}
else if(rdata.is_eprosima_endpoint)
else
{
RTPSMessageGroup group(mp_RTPSParticipant, this, RTPSMessageGroup::WRITER, m_cdrmessages);
LocatorList_t locatorsList(rp->m_att.endpoint.unicastLocatorList);
locatorsList.push_back(rp->m_att.endpoint.multicastLocatorList);

// Send initial heartbeat
send_heartbeat_nts_({rp->m_att.guid}, locatorsList, group, false, true);
send_heartbeat_nts_({rp->m_att.guid}, locatorsList, group, false);
}

matched_readers.push_back(rp);
Expand Down Expand Up @@ -833,19 +833,17 @@ SequenceNumber_t StatefulWriter::next_sequence_number() const
return mp_history->next_sequence_number();
}

void StatefulWriter::send_heartbeat_to_nts(ReaderProxy& remoteReaderProxy, bool final,
bool send_empty_history_info)
void StatefulWriter::send_heartbeat_to_nts(ReaderProxy& remoteReaderProxy, bool final)
{
RTPSMessageGroup group(mp_RTPSParticipant, this, RTPSMessageGroup::WRITER, m_cdrmessages);
LocatorList_t locators(remoteReaderProxy.m_att.endpoint.unicastLocatorList);
locators.push_back(remoteReaderProxy.m_att.endpoint.multicastLocatorList);

send_heartbeat_nts_({remoteReaderProxy.m_att.guid},
locators, group, final, send_empty_history_info);
send_heartbeat_nts_({remoteReaderProxy.m_att.guid}, locators, group, final);
}

void StatefulWriter::send_heartbeat_nts_(const std::vector<GUID_t>& remote_readers, const LocatorList_t &locators,
RTPSMessageGroup& message_group, bool final, bool send_empty_history_info)
RTPSMessageGroup& message_group, bool final)
{
SequenceNumber_t firstSeq = get_seq_num_min();
SequenceNumber_t lastSeq = get_seq_num_max();
Expand All @@ -854,10 +852,10 @@ void StatefulWriter::send_heartbeat_nts_(const std::vector<GUID_t>& remote_reade
{
assert(firstSeq == c_SequenceNumber_Unknown && lastSeq == c_SequenceNumber_Unknown);

if(send_empty_history_info && remote_readers.size() == 1)
if(remote_readers.size() == 1)
{
firstSeq = next_sequence_number();
lastSeq = {0, 0};
lastSeq = firstSeq - 1;
}
else
{
Expand Down Expand Up @@ -910,7 +908,7 @@ void StatefulWriter::process_acknack(const GUID_t reader_guid, uint32_t ack_coun
}
else if(sn_set.isSetEmpty() && !final_flag)
{
send_heartbeat_to_nts(*remote_reader, true, remote_reader->m_att.is_eprosima_endpoint);
send_heartbeat_to_nts(*remote_reader, true);
}

// Check if all CacheChange are acknowledge, because a user could be waiting
Expand Down

0 comments on commit b3716ce

Please sign in to comment.