Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[19347] Fix encapsulation format in WLP (backport #3784) #3788

Merged
merged 1 commit into from
Aug 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 35 additions & 19 deletions include/fastdds/rtps/builtin/liveliness/WLPListener.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@
#define _FASTDDS_RTPS_WLPLISTENER_H_
#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC

#include <fastdds/rtps/reader/ReaderListener.h>
#include <fastdds/rtps/common/Guid.h>
#include <fastdds/rtps/common/GuidPrefix_t.hpp>
#include <fastdds/rtps/common/InstanceHandle.h>

#include <fastdds/rtps/common/Types.h>
#include <fastdds/rtps/reader/ReaderListener.h>
#include <fastrtps/qos/QosPolicies.h>

namespace eprosima {
namespace fastrtps{
namespace fastrtps {
namespace rtps {

class WLP;
Expand All @@ -39,14 +39,16 @@ struct CacheChange_t;
* Class WLPListener that receives the liveliness messages asserting the liveliness of remote endpoints.
* @ingroup LIVELINESS_MODULE
*/
class WLPListener: public ReaderListener {
class WLPListener : public ReaderListener
{
public:

/**
* @brief Constructor
* @param pwlp Pointer to the writer liveliness protocol
*/
WLPListener(WLP* pwlp);
WLPListener(
WLP* pwlp);

/**
* @brief Destructor
Expand All @@ -60,27 +62,41 @@ class WLPListener: public ReaderListener {
*/
void onNewCacheChangeAdded(
RTPSReader* reader,
const CacheChange_t* const change) override;
const CacheChange_t* const change) override;

private:

/**
* Separate the Key between the GuidPrefix_t and the liveliness Kind
* @param key InstanceHandle_t to separate.
* @param guidP GuidPrefix_t pointer to store the info.
* @param liveliness Liveliness Kind Pointer.
* @return True if correctly separated.
*/
* Separate the Key between the GuidPrefix_t and the liveliness Kind
* @param key InstanceHandle_t to separate.
* @param guidP GuidPrefix_t pointer to store the info.
* @param liveliness Liveliness Kind Pointer.
* @return True if correctly separated.
*/
bool separateKey(
InstanceHandle_t& key,
GuidPrefix_t* guidP,
LivelinessQosPolicyKind* liveliness);

/**
* Compute the key from a CacheChange_t
* @param change
*/
bool computeKey(CacheChange_t* change);
* Compute the key from a CacheChange_t
* @param change
*/
bool computeKey(
CacheChange_t* change);

/**
* @brief Check that the ParticipantMessageData kind is a valid one for WLP and extract the liveliness kind.
*
* @param[in] serialized_kind A pointer to the first octet of the kind array. The function assumes 4 elements
* in the array.
* @param[out] liveliness_kind A reference to the LivelinessQosPolicyKind.
*
* @return True if the kind corresponds with one for WLP, false otherwise.
*/
bool get_wlp_kind(
const octet* serialized_kind,
LivelinessQosPolicyKind& liveliness_kind);

//! A pointer to the writer liveliness protocol
WLP* mp_WLP;
Expand All @@ -89,6 +105,6 @@ class WLPListener: public ReaderListener {

} /* namespace rtps */
} /* namespace eprosima */
}
#endif
} // namespace eprosima
#endif // ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
#endif /* _FASTDDS_RTPS_WLPLISTENER_H_ */
6 changes: 5 additions & 1 deletion include/fastdds/rtps/common/CDRMessage_t.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,11 @@ struct RTPS_DllAPI CDRMessage_t final
const SerializedPayload_t& payload)
: wraps(true)
{
msg_endian = payload.encapsulation == PL_CDR_BE ? BIGEND : LITTLEEND;
msg_endian = LITTLEEND;
if (payload.encapsulation == PL_CDR_BE || payload.encapsulation == CDR_BE)
{
msg_endian = BIGEND;
}
pos = payload.pos;
length = payload.length;
buffer = payload.data;
Expand Down
2 changes: 1 addition & 1 deletion include/fastdds/rtps/common/SerializedPayload.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ namespace rtps {
#define PL_CDR_LE 0x0003

#if FASTDDS_IS_BIG_ENDIAN_TARGET
#define DEFAULT_ENCAPSULATION CDR_LE
#define DEFAULT_ENCAPSULATION CDR_BE
#define PL_DEFAULT_ENCAPSULATION PL_CDR_BE
#else
#define DEFAULT_ENCAPSULATION CDR_LE
Expand Down
11 changes: 11 additions & 0 deletions include/fastdds/rtps/messages/CDRMessage.h
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,17 @@ inline bool addParticipantGenericMessage(

///@}

/**
* @brief Skip bytes in serialized buffer
*
* @param msg The CDR message
* @param length The number of bytes to skip
* @return true if skipped, false otherwise
*/
inline bool skip(
CDRMessage_t* msg,
uint32_t length);

} /* namespace CDRMessage */

} /* namespace rtps */
Expand Down
14 changes: 14 additions & 0 deletions include/fastdds/rtps/messages/CDRMessage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1301,6 +1301,20 @@ inline bool CDRMessage::readParticipantGenericMessage(
return true;
}

inline bool CDRMessage::skip(
CDRMessage_t* msg,
uint32_t length)
{
// Validate input
bool ret = (msg != nullptr) && (msg->pos + length <= msg->length);
if (ret)
{
// Advance index the number of specified bytes
msg->pos += length;
}
return ret;
}

} // namespace rtps
} // namespace fastrtps
} // namespace eprosima
Expand Down
4 changes: 2 additions & 2 deletions src/cpp/rtps/builtin/liveliness/WLP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -900,9 +900,9 @@ bool WLP::send_liveliness_message(

if (change != nullptr)
{
change->serializedPayload.encapsulation = (uint16_t)PL_DEFAULT_ENCAPSULATION;
change->serializedPayload.encapsulation = (uint16_t)DEFAULT_ENCAPSULATION;
change->serializedPayload.data[0] = 0;
change->serializedPayload.data[1] = PL_DEFAULT_ENCAPSULATION;
change->serializedPayload.data[1] = DEFAULT_ENCAPSULATION;
change->serializedPayload.data[2] = 0;
change->serializedPayload.data[3] = 0;

Expand Down
123 changes: 90 additions & 33 deletions src/cpp/rtps/builtin/liveliness/WLPListener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,33 @@
* @file WLPListener.cpp
*
*/

#include <fastdds/rtps/builtin/liveliness/WLPListener.h>
#include <fastdds/rtps/builtin/liveliness/WLP.h>

#include <fastdds/rtps/history/ReaderHistory.h>
#include <cstdint>
#include <cstring>
#include <mutex>
#include <vector>

#include <fastdds/rtps/builtin/discovery/participant/PDPSimple.h>
#include <fastdds/dds/log/Log.hpp>
#include <fastdds/rtps/builtin/BuiltinProtocols.h>

#include <fastdds/rtps/reader/StatefulReader.h>
#include <fastdds/rtps/builtin/discovery/participant/PDPSimple.h>
#include <fastdds/rtps/builtin/liveliness/WLP.h>
#include <fastdds/rtps/common/CacheChange.h>
#include <fastdds/rtps/common/CDRMessage_t.h>
#include <fastdds/rtps/common/GuidPrefix_t.hpp>
#include <fastdds/rtps/common/InstanceHandle.h>
#include <fastdds/rtps/common/SerializedPayload.h>
#include <fastdds/rtps/common/Types.h>
#include <fastdds/rtps/history/ReaderHistory.h>
#include <fastdds/rtps/messages/CDRMessage.h>
#include <fastdds/rtps/reader/RTPSReader.h>
#include <fastdds/rtps/writer/LivelinessManager.h>
#include <fastdds/dds/log/Log.hpp>

#include <mutex>


#include <fastrtps/qos/QosPolicies.h>

namespace eprosima {
namespace fastrtps {
namespace rtps {


WLPListener::WLPListener(
WLP* plwp)
: mp_WLP(plwp)
Expand All @@ -48,16 +53,14 @@ WLPListener::~WLPListener()
{
}

typedef std::vector<WriterProxy*>::iterator WPIT;

void WLPListener::onNewCacheChangeAdded(
RTPSReader* reader,
const CacheChange_t* const changeIN)
{
std::lock_guard<std::recursive_mutex> guard2(*mp_WLP->mp_builtinProtocols->mp_PDP->getMutex());

GuidPrefix_t guidP;
LivelinessQosPolicyKind livelinessKind;
LivelinessQosPolicyKind livelinessKind = AUTOMATIC_LIVELINESS_QOS;
CacheChange_t* change = (CacheChange_t*)changeIN;
if (!computeKey(change))
{
Expand All @@ -74,23 +77,50 @@ void WLPListener::onNewCacheChangeAdded(
break;
}
}
if (change->serializedPayload.length > 0)
{
if (PL_CDR_BE == change->serializedPayload.data[1])
{
change->serializedPayload.encapsulation = (uint16_t)PL_CDR_BE;
}
else
{
change->serializedPayload.encapsulation = (uint16_t)PL_CDR_LE;
}

for (size_t i = 0; i < 12; ++i)
// Serialized payload should have at least 4 bytes of representation header, 12 of GuidPrefix,
// 4 of kind, and 4 of length.
constexpr uint32_t participant_msg_data_kind_size = 4;
constexpr uint32_t participant_msg_data_length_size = 4;
constexpr uint32_t min_serialized_length = SerializedPayload_t::representation_header_size
+ GuidPrefix_t::size
+ participant_msg_data_kind_size
+ participant_msg_data_length_size;

if (change->serializedPayload.length >= min_serialized_length)
{
constexpr uint32_t participant_msg_data_kind_pos = 16;
constexpr uint32_t encapsulation_pos = 1;
uint32_t data_length = 0;

// Extract encapsulation from the second byte of the representation header. Done prior to
// creating the CDRMessage_t, as the CDRMessage_t ctor uses it for its own state.
change->serializedPayload.encapsulation =
static_cast<uint16_t>(change->serializedPayload.data[encapsulation_pos]);

// Create CDR message from buffer to deserialize contents for further validation
CDRMessage_t cdr_message(change->serializedPayload);

bool message_ok = (
// Skip representation header
CDRMessage::skip(&cdr_message, SerializedPayload_t::representation_header_size)
// Extract GuidPrefix
&& CDRMessage::readData(&cdr_message, guidP.value, GuidPrefix_t::size)
// Skip kind, it will be validated later
&& CDRMessage::skip(&cdr_message, participant_msg_data_kind_size)
// Extract and validate liveliness kind
&& get_wlp_kind(&change->serializedPayload.data[participant_msg_data_kind_pos], livelinessKind)
// Extract data length
&& CDRMessage::readUInt32(&cdr_message, &data_length)
// Check that serialized length is correctly set
&& (change->serializedPayload.length >= min_serialized_length + data_length));

if (!message_ok)
{
guidP.value[i] = change->serializedPayload.data[i + 4];
EPROSIMA_LOG_INFO(RTPS_LIVELINESS, "Ignoring incorrect WLP ParticipantDataMessage");
history->remove_change(change);
return;
}
livelinessKind = (LivelinessQosPolicyKind)(change->serializedPayload.data[19] - 0x01);

}
else
{
Expand All @@ -99,6 +129,8 @@ void WLPListener::onNewCacheChangeAdded(
&guidP,
&livelinessKind))
{
EPROSIMA_LOG_INFO(RTPS_LIVELINESS, "Ignoring not WLP ParticipantDataMessage");
history->remove_change(change);
return;
}
}
Expand Down Expand Up @@ -130,12 +162,13 @@ bool WLPListener::separateKey(
GuidPrefix_t* guidP,
LivelinessQosPolicyKind* liveliness)
{
for (uint8_t i = 0; i < 12; ++i)
bool ret = get_wlp_kind(&key.value[12], *liveliness);
if (ret)
{
guidP->value[i] = key.value[i];
// Extract GuidPrefix
memcpy(guidP->value, key.value, 12);
}
*liveliness = (LivelinessQosPolicyKind)key.value[15];
return true;
return ret;
}

bool WLPListener::computeKey(
Expand All @@ -154,6 +187,30 @@ bool WLPListener::computeKey(
return true;
}

bool WLPListener::get_wlp_kind(
const octet* serialized_kind,
LivelinessQosPolicyKind& liveliness_kind)
{
/*
* From RTPS 2.5 9.6.3.1, the ParticipantMessageData kinds for WLP are:
* - PARTICIPANT_MESSAGE_DATA_KIND_AUTOMATIC_LIVELINESS_UPDATE {0x00, 0x00, 0x00, 0x01}
* - PARTICIPANT_MESSAGE_DATA_KIND_MANUAL_LIVELINESS_UPDATE {0x00, 0x00, 0x00, 0x02}
*/
bool is_wlp = (
serialized_kind[0] == 0
&& serialized_kind[1] == 0
&& serialized_kind[2] == 0
&& (serialized_kind[3] == 0x01 || serialized_kind[3] == 0x02));

if (is_wlp)
{
// Adjust and cast to LivelinessQosPolicyKind enum, where AUTOMATIC_LIVELINESS_QOS == 0
liveliness_kind = static_cast<LivelinessQosPolicyKind>(serialized_kind[3] - 0x01);
}

return is_wlp;
}

} /* namespace rtps */
} /* namespace eprosima */
} // namespace eprosima
Loading