Skip to content

Commit

Permalink
Fix encapsulation format in WLP (#3784)
Browse files Browse the repository at this point in the history
* Refs #19347: Fix encapsulation format in WLP. Improve WLP checks

Signed-off-by: Eduardo Ponz <eduardoponz@eprosima.com>

* Refs #19347: Correctly set CDR endianess for BE

Signed-off-by: Eduardo Ponz <eduardoponz@eprosima.com>

* Refs #19347: Uncrustify

Signed-off-by: Eduardo Ponz <eduardoponz@eprosima.com>

* Refs #19347: Fix doxygen

Signed-off-by: Eduardo Ponz <eduardoponz@eprosima.com>

* Refs #19347: Apply suggestions

Signed-off-by: Eduardo Ponz <eduardoponz@eprosima.com>

* Refs #19347: Fix Windows warning

Signed-off-by: Eduardo Ponz <eduardoponz@eprosima.com>

* Refs #19347: Apply suggestions

Signed-off-by: Eduardo Ponz <eduardoponz@eprosima.com>

* Refs #19347: Correct condition when setting the payload encapsulation

Signed-off-by: Eduardo Ponz <eduardoponz@eprosima.com>

* Refs #19347: Remove legacy typedef

Signed-off-by: Eduardo Ponz <eduardoponz@eprosima.com>

* Refs #19347: Initialize uint32_t variable

Signed-off-by: Eduardo Ponz <eduardoponz@eprosima.com>

---------

Signed-off-by: Eduardo Ponz <eduardoponz@eprosima.com>
(cherry picked from commit 3a83d7c)
  • Loading branch information
EduPonz authored and mergify[bot] committed Aug 7, 2023
1 parent aa4556d commit ffd7215
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 56 deletions.
54 changes: 35 additions & 19 deletions include/fastdds/rtps/builtin/liveliness/WLPListener.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@
#define _FASTDDS_RTPS_WLPLISTENER_H_
#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC

#include <fastdds/rtps/reader/ReaderListener.h>
#include <fastdds/rtps/common/Guid.h>
#include <fastdds/rtps/common/GuidPrefix_t.hpp>
#include <fastdds/rtps/common/InstanceHandle.h>

#include <fastdds/rtps/common/Types.h>
#include <fastdds/rtps/reader/ReaderListener.h>
#include <fastrtps/qos/QosPolicies.h>

namespace eprosima {
namespace fastrtps{
namespace fastrtps {
namespace rtps {

class WLP;
Expand All @@ -39,14 +39,16 @@ struct CacheChange_t;
* Class WLPListener that receives the liveliness messages asserting the liveliness of remote endpoints.
* @ingroup LIVELINESS_MODULE
*/
class WLPListener: public ReaderListener {
class WLPListener : public ReaderListener
{
public:

/**
* @brief Constructor
* @param pwlp Pointer to the writer liveliness protocol
*/
WLPListener(WLP* pwlp);
WLPListener(
WLP* pwlp);

/**
* @brief Destructor
Expand All @@ -60,27 +62,41 @@ class WLPListener: public ReaderListener {
*/
void onNewCacheChangeAdded(
RTPSReader* reader,
const CacheChange_t* const change) override;
const CacheChange_t* const change) override;

private:

/**
* Separate the Key between the GuidPrefix_t and the liveliness Kind
* @param key InstanceHandle_t to separate.
* @param guidP GuidPrefix_t pointer to store the info.
* @param liveliness Liveliness Kind Pointer.
* @return True if correctly separated.
*/
* Separate the Key between the GuidPrefix_t and the liveliness Kind
* @param key InstanceHandle_t to separate.
* @param guidP GuidPrefix_t pointer to store the info.
* @param liveliness Liveliness Kind Pointer.
* @return True if correctly separated.
*/
bool separateKey(
InstanceHandle_t& key,
GuidPrefix_t* guidP,
LivelinessQosPolicyKind* liveliness);

/**
* Compute the key from a CacheChange_t
* @param change
*/
bool computeKey(CacheChange_t* change);
* Compute the key from a CacheChange_t
* @param change
*/
bool computeKey(
CacheChange_t* change);

/**
* @brief Check that the ParticipantMessageData kind is a valid one for WLP and extract the liveliness kind.
*
* @param[in] serialized_kind A pointer to the first octet of the kind array. The function assumes 4 elements
* in the array.
* @param[out] liveliness_kind A reference to the LivelinessQosPolicyKind.
*
* @return True if the kind corresponds with one for WLP, false otherwise.
*/
bool get_wlp_kind(
const octet* serialized_kind,
LivelinessQosPolicyKind& liveliness_kind);

//! A pointer to the writer liveliness protocol
WLP* mp_WLP;
Expand All @@ -89,6 +105,6 @@ class WLPListener: public ReaderListener {

} /* namespace rtps */
} /* namespace eprosima */
}
#endif
} // namespace eprosima
#endif // ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
#endif /* _FASTDDS_RTPS_WLPLISTENER_H_ */
6 changes: 5 additions & 1 deletion include/fastdds/rtps/common/CDRMessage_t.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,11 @@ struct RTPS_DllAPI CDRMessage_t final
const SerializedPayload_t& payload)
: wraps(true)
{
msg_endian = payload.encapsulation == PL_CDR_BE ? BIGEND : LITTLEEND;
msg_endian = LITTLEEND;
if (payload.encapsulation == PL_CDR_BE || payload.encapsulation == CDR_BE)
{
msg_endian = BIGEND;
}
pos = payload.pos;
length = payload.length;
buffer = payload.data;
Expand Down
2 changes: 1 addition & 1 deletion include/fastdds/rtps/common/SerializedPayload.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ namespace rtps {
#define PL_CDR_LE 0x0003

#if FASTDDS_IS_BIG_ENDIAN_TARGET
#define DEFAULT_ENCAPSULATION CDR_LE
#define DEFAULT_ENCAPSULATION CDR_BE
#define PL_DEFAULT_ENCAPSULATION PL_CDR_BE
#else
#define DEFAULT_ENCAPSULATION CDR_LE
Expand Down
11 changes: 11 additions & 0 deletions include/fastdds/rtps/messages/CDRMessage.h
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,17 @@ inline bool addParticipantGenericMessage(

///@}

/**
* @brief Skip bytes in serialized buffer
*
* @param msg The CDR message
* @param length The number of bytes to skip
* @return true if skipped, false otherwise
*/
inline bool skip(
CDRMessage_t* msg,
uint32_t length);

} /* namespace CDRMessage */

} /* namespace rtps */
Expand Down
14 changes: 14 additions & 0 deletions include/fastdds/rtps/messages/CDRMessage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1301,6 +1301,20 @@ inline bool CDRMessage::readParticipantGenericMessage(
return true;
}

inline bool CDRMessage::skip(
CDRMessage_t* msg,
uint32_t length)
{
// Validate input
bool ret = (msg != nullptr) && (msg->pos + length <= msg->length);
if (ret)
{
// Advance index the number of specified bytes
msg->pos += length;
}
return ret;
}

} // namespace rtps
} // namespace fastrtps
} // namespace eprosima
Expand Down
4 changes: 2 additions & 2 deletions src/cpp/rtps/builtin/liveliness/WLP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -900,9 +900,9 @@ bool WLP::send_liveliness_message(

if (change != nullptr)
{
change->serializedPayload.encapsulation = (uint16_t)PL_DEFAULT_ENCAPSULATION;
change->serializedPayload.encapsulation = (uint16_t)DEFAULT_ENCAPSULATION;
change->serializedPayload.data[0] = 0;
change->serializedPayload.data[1] = PL_DEFAULT_ENCAPSULATION;
change->serializedPayload.data[1] = DEFAULT_ENCAPSULATION;
change->serializedPayload.data[2] = 0;
change->serializedPayload.data[3] = 0;

Expand Down
123 changes: 90 additions & 33 deletions src/cpp/rtps/builtin/liveliness/WLPListener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,33 @@
* @file WLPListener.cpp
*
*/

#include <fastdds/rtps/builtin/liveliness/WLPListener.h>
#include <fastdds/rtps/builtin/liveliness/WLP.h>

#include <fastdds/rtps/history/ReaderHistory.h>
#include <cstdint>
#include <cstring>
#include <mutex>
#include <vector>

#include <fastdds/rtps/builtin/discovery/participant/PDPSimple.h>
#include <fastdds/dds/log/Log.hpp>
#include <fastdds/rtps/builtin/BuiltinProtocols.h>

#include <fastdds/rtps/reader/StatefulReader.h>
#include <fastdds/rtps/builtin/discovery/participant/PDPSimple.h>
#include <fastdds/rtps/builtin/liveliness/WLP.h>
#include <fastdds/rtps/common/CacheChange.h>
#include <fastdds/rtps/common/CDRMessage_t.h>
#include <fastdds/rtps/common/GuidPrefix_t.hpp>
#include <fastdds/rtps/common/InstanceHandle.h>
#include <fastdds/rtps/common/SerializedPayload.h>
#include <fastdds/rtps/common/Types.h>
#include <fastdds/rtps/history/ReaderHistory.h>
#include <fastdds/rtps/messages/CDRMessage.h>
#include <fastdds/rtps/reader/RTPSReader.h>
#include <fastdds/rtps/writer/LivelinessManager.h>
#include <fastdds/dds/log/Log.hpp>

#include <mutex>


#include <fastrtps/qos/QosPolicies.h>

namespace eprosima {
namespace fastrtps {
namespace rtps {


WLPListener::WLPListener(
WLP* plwp)
: mp_WLP(plwp)
Expand All @@ -48,16 +53,14 @@ WLPListener::~WLPListener()
{
}

typedef std::vector<WriterProxy*>::iterator WPIT;

void WLPListener::onNewCacheChangeAdded(
RTPSReader* reader,
const CacheChange_t* const changeIN)
{
std::lock_guard<std::recursive_mutex> guard2(*mp_WLP->mp_builtinProtocols->mp_PDP->getMutex());

GuidPrefix_t guidP;
LivelinessQosPolicyKind livelinessKind;
LivelinessQosPolicyKind livelinessKind = AUTOMATIC_LIVELINESS_QOS;
CacheChange_t* change = (CacheChange_t*)changeIN;
if (!computeKey(change))
{
Expand All @@ -74,23 +77,50 @@ void WLPListener::onNewCacheChangeAdded(
break;
}
}
if (change->serializedPayload.length > 0)
{
if (PL_CDR_BE == change->serializedPayload.data[1])
{
change->serializedPayload.encapsulation = (uint16_t)PL_CDR_BE;
}
else
{
change->serializedPayload.encapsulation = (uint16_t)PL_CDR_LE;
}

for (size_t i = 0; i < 12; ++i)
// Serialized payload should have at least 4 bytes of representation header, 12 of GuidPrefix,
// 4 of kind, and 4 of length.
constexpr uint32_t participant_msg_data_kind_size = 4;
constexpr uint32_t participant_msg_data_length_size = 4;
constexpr uint32_t min_serialized_length = SerializedPayload_t::representation_header_size
+ GuidPrefix_t::size
+ participant_msg_data_kind_size
+ participant_msg_data_length_size;

if (change->serializedPayload.length >= min_serialized_length)
{
constexpr uint32_t participant_msg_data_kind_pos = 16;
constexpr uint32_t encapsulation_pos = 1;
uint32_t data_length = 0;

// Extract encapsulation from the second byte of the representation header. Done prior to
// creating the CDRMessage_t, as the CDRMessage_t ctor uses it for its own state.
change->serializedPayload.encapsulation =
static_cast<uint16_t>(change->serializedPayload.data[encapsulation_pos]);

// Create CDR message from buffer to deserialize contents for further validation
CDRMessage_t cdr_message(change->serializedPayload);

bool message_ok = (
// Skip representation header
CDRMessage::skip(&cdr_message, SerializedPayload_t::representation_header_size)
// Extract GuidPrefix
&& CDRMessage::readData(&cdr_message, guidP.value, GuidPrefix_t::size)
// Skip kind, it will be validated later
&& CDRMessage::skip(&cdr_message, participant_msg_data_kind_size)
// Extract and validate liveliness kind
&& get_wlp_kind(&change->serializedPayload.data[participant_msg_data_kind_pos], livelinessKind)
// Extract data length
&& CDRMessage::readUInt32(&cdr_message, &data_length)
// Check that serialized length is correctly set
&& (change->serializedPayload.length >= min_serialized_length + data_length));

if (!message_ok)
{
guidP.value[i] = change->serializedPayload.data[i + 4];
EPROSIMA_LOG_INFO(RTPS_LIVELINESS, "Ignoring incorrect WLP ParticipantDataMessage");
history->remove_change(change);
return;
}
livelinessKind = (LivelinessQosPolicyKind)(change->serializedPayload.data[19] - 0x01);

}
else
{
Expand All @@ -99,6 +129,8 @@ void WLPListener::onNewCacheChangeAdded(
&guidP,
&livelinessKind))
{
EPROSIMA_LOG_INFO(RTPS_LIVELINESS, "Ignoring not WLP ParticipantDataMessage");
history->remove_change(change);
return;
}
}
Expand Down Expand Up @@ -130,12 +162,13 @@ bool WLPListener::separateKey(
GuidPrefix_t* guidP,
LivelinessQosPolicyKind* liveliness)
{
for (uint8_t i = 0; i < 12; ++i)
bool ret = get_wlp_kind(&key.value[12], *liveliness);
if (ret)
{
guidP->value[i] = key.value[i];
// Extract GuidPrefix
memcpy(guidP->value, key.value, 12);
}
*liveliness = (LivelinessQosPolicyKind)key.value[15];
return true;
return ret;
}

bool WLPListener::computeKey(
Expand All @@ -154,6 +187,30 @@ bool WLPListener::computeKey(
return true;
}

bool WLPListener::get_wlp_kind(
const octet* serialized_kind,
LivelinessQosPolicyKind& liveliness_kind)
{
/*
* From RTPS 2.5 9.6.3.1, the ParticipantMessageData kinds for WLP are:
* - PARTICIPANT_MESSAGE_DATA_KIND_AUTOMATIC_LIVELINESS_UPDATE {0x00, 0x00, 0x00, 0x01}
* - PARTICIPANT_MESSAGE_DATA_KIND_MANUAL_LIVELINESS_UPDATE {0x00, 0x00, 0x00, 0x02}
*/
bool is_wlp = (
serialized_kind[0] == 0
&& serialized_kind[1] == 0
&& serialized_kind[2] == 0
&& (serialized_kind[3] == 0x01 || serialized_kind[3] == 0x02));

if (is_wlp)
{
// Adjust and cast to LivelinessQosPolicyKind enum, where AUTOMATIC_LIVELINESS_QOS == 0
liveliness_kind = static_cast<LivelinessQosPolicyKind>(serialized_kind[3] - 0x01);
}

return is_wlp;
}

} /* namespace rtps */
} /* namespace eprosima */
} // namespace eprosima

0 comments on commit ffd7215

Please sign in to comment.