Skip to content

Commit

Permalink
Change request headers to use rmw_request_id_t on the wire
Browse files Browse the repository at this point in the history
Signed-off-by: Erik Boasson <eb@ilities.com>
  • Loading branch information
eboasson committed May 22, 2020
1 parent b181aba commit fb040c5
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 43 deletions.
4 changes: 2 additions & 2 deletions rmw_cyclonedds_cpp/src/Serialization.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,8 @@ class CDRWriter : public BaseCDRWriter
if (eversion == EncodingVersion::CDR_Legacy) {
cursor->rebase(+4);
}
cursor->put_bytes(&request.header.guid, sizeof(request.header.guid));
cursor->put_bytes(&request.header.seq, sizeof(request.header.seq));
cursor->put_bytes(&request.header.writer_guid, sizeof(request.header.writer_guid));
cursor->put_bytes(&request.header.sequence_number, sizeof(request.header.sequence_number));

serialize(cursor, request.data, m_root_value_type.get());

Expand Down
47 changes: 15 additions & 32 deletions rmw_cyclonedds_cpp/src/rmw_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,6 @@ struct CddsNode

struct CddsPublisher : CddsEntity
{
dds_instance_handle_t pubiid;
rmw_gid_t gid;
struct ddsi_sertopic * sertopic;
};
Expand Down Expand Up @@ -1747,20 +1746,12 @@ static CddsPublisher * create_cdds_publisher(
RMW_SET_ERROR_MSG("failed to create writer");
goto fail_writer;
}
if (dds_get_instance_handle(pub->enth, &pub->pubiid) < 0) {
RMW_SET_ERROR_MSG("failed to get instance handle for writer");
goto fail_instance_handle;
}
get_entity_gid(pub->enth, pub->gid);
pub->sertopic = stact;
dds_delete_qos(qos);
dds_delete(topic);
return pub;

fail_instance_handle:
if (dds_delete(pub->enth) < 0) {
RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to destroy writer during error handling");
}
fail_writer:
dds_delete_qos(qos);
fail_qos:
Expand Down Expand Up @@ -1864,10 +1855,7 @@ extern "C" rmw_ret_t rmw_get_gid_for_publisher(const rmw_publisher_t * publisher
RET_NULL(gid);
auto pub = static_cast<const CddsPublisher *>(publisher->data);
RET_NULL(pub);
gid->implementation_identifier = eclipse_cyclonedds_identifier;
memset(gid->data, 0, sizeof(gid->data));
assert(sizeof(pub->pubiid) <= sizeof(gid->data));
memcpy(gid->data, &pub->pubiid, sizeof(pub->pubiid));
*gid = pub->gid;
return RMW_RET_OK;
}

Expand Down Expand Up @@ -3023,7 +3011,7 @@ extern "C" rmw_ret_t rmw_wait(
static rmw_ret_t rmw_take_response_request(
CddsCS * cs, rmw_service_info_t * request_header,
void * ros_data, bool * taken, dds_time_t * source_timestamp,
dds_instance_handle_t srcfilter)
const rmw_gid_t *srcfilter)
{
RET_NULL(taken);
RET_NULL(ros_data);
Expand All @@ -3034,17 +3022,15 @@ static rmw_ret_t rmw_take_response_request(
void * wrap_ptr = static_cast<void *>(&wrap);
while (dds_take(cs->sub->enth, &wrap_ptr, &info, 1, 1) == 1) {
if (info.valid_data) {
memset(request_header, 0, sizeof(wrap.header));
assert(sizeof(wrap.header.guid) <= sizeof(request_header->request_id.writer_guid));
memcpy(request_header->request_id.writer_guid, &wrap.header.guid, sizeof(wrap.header.guid));
request_header->request_id.sequence_number = wrap.header.seq;
request_header->request_id = wrap.header;
request_header->source_timestamp = info.source_timestamp;
// TODO(iluetkeb) replace with real received timestamp when available in cyclone
request_header->received_timestamp = 0;
if (source_timestamp) {
*source_timestamp = info.source_timestamp;
}
if (srcfilter == 0 || srcfilter == wrap.header.guid) {
static_assert(sizeof(request_header->request_id.writer_guid) <= sizeof(srcfilter->data), "rmw_request_id::writer_guid must not be larger than rmw_gid_t");
if (srcfilter == nullptr || memcmp(wrap.header.writer_guid, srcfilter->data, sizeof(wrap.header.writer_guid)) == 0) {
*taken = true;
return RMW_RET_OK;
}
Expand All @@ -3064,7 +3050,7 @@ extern "C" rmw_ret_t rmw_take_response(
dds_time_t source_timestamp;
rmw_ret_t ret = rmw_take_response_request(
&info->client, request_header, ros_response, taken,
&source_timestamp, info->client.pub->pubiid);
&source_timestamp, &info->client.pub->gid);

#if REPORT_BLOCKED_REQUESTS
if (ret == RMW_RET_OK && *taken) {
Expand Down Expand Up @@ -3112,7 +3098,7 @@ extern "C" rmw_ret_t rmw_take_request(
}

static rmw_ret_t rmw_send_response_request(
CddsCS * cs, const cdds_request_header_t & header,
CddsCS * cs, const rmw_request_id_t & header,
const void * ros_data)
{
const cdds_request_wrapper_t wrap = {header, const_cast<void *>(ros_data)};
Expand Down Expand Up @@ -3157,7 +3143,7 @@ extern "C" rmw_ret_t rmw_send_response(
while (!check_for_response_reader(info->service.sub->enth, info->service.pub->enth)) {
dds_sleepfor(DDS_MSECS(10));
}
return rmw_send_response_request(&info->service, header, ros_response);
return rmw_send_response_request(&info->service, *request_header, ros_response);
}

extern "C" rmw_ret_t rmw_send_request(
Expand All @@ -3169,9 +3155,11 @@ extern "C" rmw_ret_t rmw_send_request(
RET_NULL(ros_request);
RET_NULL(sequence_id);
auto info = static_cast<CddsClient *>(client->data);
cdds_request_header_t header;
header.guid = info->client.pub->pubiid;
header.seq = *sequence_id = ++next_request_id;
rmw_request_id_t reqid;
static_assert(sizeof(dds_guid_t) <= sizeof(reqid.writer_guid), "dds_guid_t must not be larger than rmw_request_id::writer_guid");
static_assert(sizeof(reqid.writer_guid) <= sizeof(info->client.pub->gid.data), "rmw_request_id::writer_guid must not be larger than rmw_gid_t");
memcpy(reqid.writer_guid, info->client.pub->gid.data, sizeof(reqid.writer_guid));
reqid.sequence_number = *sequence_id = ++next_request_id;

#if REPORT_BLOCKED_REQUESTS
{
Expand All @@ -3180,7 +3168,7 @@ extern "C" rmw_ret_t rmw_send_request(
}
#endif

return rmw_send_response_request(&info->client, header, ros_request);
return rmw_send_response_request(&info->client, reqid, ros_request);
}

static const rosidl_service_type_support_t * get_service_typesupport(
Expand Down Expand Up @@ -3297,10 +3285,6 @@ static rmw_ret_t rmw_init_cs(
RMW_SET_ERROR_MSG("failed to create readcondition");
goto fail_readcond;
}
if (dds_get_instance_handle(pub->enth, &pub->pubiid) < 0) {
RMW_SET_ERROR_MSG("failed to get instance handle for writer");
goto fail_instance_handle;
}
dds_delete_qos(qos);
dds_delete(subtopic);
dds_delete(pubtopic);
Expand All @@ -3309,8 +3293,6 @@ static rmw_ret_t rmw_init_cs(
cs->sub = sub;
return RMW_RET_OK;

fail_instance_handle:
dds_delete(sub->rdcondh);
fail_readcond:
dds_delete(sub->enth);
fail_reader:
Expand Down Expand Up @@ -3664,6 +3646,7 @@ extern "C" rmw_ret_t rmw_service_server_is_available(
return RMW_RET_OK;
}
// all conditions met, there is a service server available
dds_sleepfor(DDS_MSECS(100));
*is_available = true;
return RMW_RET_OK;
}
Expand Down
32 changes: 30 additions & 2 deletions rmw_cyclonedds_cpp/src/serdata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,27 @@ static void serdata_rmw_to_ser_unref(struct ddsi_serdata * dcmn, const ddsrt_iov
ddsi_serdata_unref(static_cast<serdata_rmw *>(dcmn));
}

typedef std::array<uint8_t,
sizeof((reinterpret_cast<rmw_request_id_t *>(0))->writer_guid)> rmw_request_id_writer_guid_t;

static void get_rmw_request_id_writer_guid(
const rmw_request_id_t & reqid,
rmw_request_id_writer_guid_t & guid)
{
for (size_t i = 0; i < sizeof(reqid.writer_guid); i++) {
guid[i] = reqid.writer_guid[i];
}
}

static void set_rmw_request_id_writer_guid(
rmw_request_id_t & reqid,
const rmw_request_id_writer_guid_t & guid)
{
for (size_t i = 0; i < sizeof(reqid.writer_guid); i++) {
reqid.writer_guid[i] = guid[i];
}
}

static bool serdata_rmw_to_sample(
const struct ddsi_serdata * dcmn, void * sample, void ** bufptr,
void * buflim)
Expand Down Expand Up @@ -292,7 +313,12 @@ static bool serdata_rmw_to_sample(
stream -- I haven't checked how it is done in the official RMW implementations, so it is
probably incompatible. */
cdds_request_wrapper_t * const wrap = static_cast<cdds_request_wrapper_t *>(sample);
auto prefix = [wrap](cycdeser & ser) {ser >> wrap->header.guid; ser >> wrap->header.seq;};
auto prefix = [wrap](cycdeser & ser) {
rmw_request_id_writer_guid_t writer_guid;
ser >> writer_guid;
set_rmw_request_id_writer_guid(wrap->header, writer_guid);
ser >> wrap->header.sequence_number;
};
cycdeser sd(d->data(), d->size());
if (using_introspection_c_typesupport(topic->type_support.typesupport_identifier_)) {
auto typed_typesupport =
Expand Down Expand Up @@ -364,7 +390,9 @@ static size_t serdata_rmw_print(
probably incompatible. */
cdds_request_wrapper_t wrap;
auto prefix = [&wrap](cycprint & ser) {
ser >> wrap.header.guid; ser.print_constant(","); ser >> wrap.header.seq;
rmw_request_id_writer_guid_t writer_guid;
get_rmw_request_id_writer_guid(wrap.header, writer_guid);
ser >> writer_guid; ser.print_constant(","); ser >> wrap.header.sequence_number;
};
cycprint sd(buf, bufsize, d->data(), d->size());
if (using_introspection_c_typesupport(topic->type_support.typesupport_identifier_)) {
Expand Down
10 changes: 3 additions & 7 deletions rmw_cyclonedds_cpp/src/serdata.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#include <memory>
#include <string>

#include "rmw/rmw.h" // needed for rmw_request_id_t

#include "TypeSupport2.hpp"
#include "bytewise.hpp"
#include "dds/ddsi/ddsi_serdata.h"
Expand Down Expand Up @@ -60,15 +62,9 @@ class serdata_rmw : public ddsi_serdata
void * data() const {return m_data.get();}
};

typedef struct cdds_request_header
{
uint64_t guid;
int64_t seq;
} cdds_request_header_t;

typedef struct cdds_request_wrapper
{
cdds_request_header_t header;
rmw_request_id_t header;
void * data;
} cdds_request_wrapper_t;

Expand Down

0 comments on commit fb040c5

Please sign in to comment.