Skip to content

Commit

Permalink
Feature/services timestamps (#369)
Browse files Browse the repository at this point in the history
* remember the sample info on requests and responses

Signed-off-by: Luetkebohle Ingo (CR/AEX3) <ingo.luetkebohle@de.bosch.com>

* This implements services timestamps

Requires ros2/rmw#217
Realizes the 2nd part of ros2/design#259

Signed-off-by: Luetkebohle Ingo (CR/AEX3) <ingo.luetkebohle@de.bosch.com>

* use rmw_time_point_value_t instead of rmw_time_t

Signed-off-by: Luetkebohle Ingo (CR/AEX3) <ingo.luetkebohle@de.bosch.com>

* snake_case

Signed-off-by: Luetkebohle Ingo (CR/AEX3) <ingo.luetkebohle@de.bosch.com>
  • Loading branch information
iluetkeb authored Apr 24, 2020
1 parent d1702b1 commit aad4762
Show file tree
Hide file tree
Showing 9 changed files with 23 additions and 19 deletions.
2 changes: 1 addition & 1 deletion rmw_fastrtps_cpp/src/rmw_request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ rmw_send_request(
rmw_ret_t
rmw_take_request(
const rmw_service_t * service,
rmw_request_id_t * request_header,
rmw_service_info_t * request_header,
void * ros_request,
bool * taken)
{
Expand Down
2 changes: 1 addition & 1 deletion rmw_fastrtps_cpp/src/rmw_response.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ extern "C"
rmw_ret_t
rmw_take_response(
const rmw_client_t * client,
rmw_request_id_t * request_header,
rmw_service_info_t * request_header,
void * ros_response,
bool * taken)
{
Expand Down
2 changes: 1 addition & 1 deletion rmw_fastrtps_dynamic_cpp/src/rmw_request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ rmw_send_request(
rmw_ret_t
rmw_take_request(
const rmw_service_t * service,
rmw_request_id_t * request_header,
rmw_service_info_t * request_header,
void * ros_request,
bool * taken)
{
Expand Down
2 changes: 1 addition & 1 deletion rmw_fastrtps_dynamic_cpp/src/rmw_response.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ extern "C"
rmw_ret_t
rmw_take_response(
const rmw_client_t * client,
rmw_request_id_t * request_header,
rmw_service_info_t * request_header,
void * ros_response,
bool * taken)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ typedef struct CustomClientResponse
{
eprosima::fastrtps::rtps::SampleIdentity sample_identity_;
std::unique_ptr<eprosima::fastcdr::FastBuffer> buffer_;
eprosima::fastrtps::SampleInfo_t sample_info_ {};
} CustomClientResponse;

class ClientListener : public eprosima::fastrtps::SubscriberListener
Expand All @@ -78,15 +79,14 @@ class ClientListener : public eprosima::fastrtps::SubscriberListener
CustomClientResponse response;
// Todo(sloretz) eliminate heap allocation pending eprosima/Fast-CDR#19
response.buffer_.reset(new eprosima::fastcdr::FastBuffer());
eprosima::fastrtps::SampleInfo_t sinfo;

rmw_fastrtps_shared_cpp::SerializedData data;
data.is_cdr_buffer = true;
data.data = response.buffer_.get();
data.impl = nullptr; // not used when is_cdr_buffer is true
if (sub->takeNextData(&data, &sinfo)) {
if (eprosima::fastrtps::rtps::ALIVE == sinfo.sampleKind) {
response.sample_identity_ = sinfo.related_sample_identity;
if (sub->takeNextData(&data, &response.sample_info_)) {
if (eprosima::fastrtps::rtps::ALIVE == response.sample_info_.sampleKind) {
response.sample_identity_ = response.sample_info_.related_sample_identity;

if (response.sample_identity_.writer_guid() == info_->writer_guid_) {
std::lock_guard<std::mutex> lock(internalMutex_);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ typedef struct CustomServiceRequest
{
eprosima::fastrtps::rtps::SampleIdentity sample_identity_;
eprosima::fastcdr::FastBuffer * buffer_;
eprosima::fastrtps::SampleInfo_t sample_info_ {};

CustomServiceRequest()
: buffer_(nullptr) {}
Expand All @@ -75,15 +76,14 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener

CustomServiceRequest request;
request.buffer_ = new eprosima::fastcdr::FastBuffer();
eprosima::fastrtps::SampleInfo_t sinfo;

rmw_fastrtps_shared_cpp::SerializedData data;
data.is_cdr_buffer = true;
data.data = request.buffer_;
data.impl = nullptr; // not used when is_cdr_buffer is true
if (sub->takeNextData(&data, &sinfo)) {
if (eprosima::fastrtps::rtps::ALIVE == sinfo.sampleKind) {
request.sample_identity_ = sinfo.sample_identity;
if (sub->takeNextData(&data, &request.sample_info_)) {
if (eprosima::fastrtps::rtps::ALIVE == request.sample_info_.sampleKind) {
request.sample_identity_ = request.sample_info_.sample_identity;

std::lock_guard<std::mutex> lock(internalMutex_);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ rmw_ret_t
__rmw_take_request(
const char * identifier,
const rmw_service_t * service,
rmw_request_id_t * request_header,
rmw_service_info_t * request_header,
void * ros_request,
bool * taken);

Expand All @@ -196,7 +196,7 @@ rmw_ret_t
__rmw_take_response(
const char * identifier,
const rmw_client_t * client,
rmw_request_id_t * request_header,
rmw_service_info_t * request_header,
void * ros_response,
bool * taken);

Expand Down
7 changes: 4 additions & 3 deletions rmw_fastrtps_shared_cpp/src/rmw_request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ rmw_ret_t
__rmw_take_request(
const char * identifier,
const rmw_service_t * service,
rmw_request_id_t * request_header,
rmw_service_info_t * request_header,
void * ros_request,
bool * taken)
{
Expand Down Expand Up @@ -102,8 +102,9 @@ __rmw_take_request(
// Get header
rmw_fastrtps_shared_cpp::copy_from_fastrtps_guid_to_byte_array(
request.sample_identity_.writer_guid(),
request_header->writer_guid);
request_header->sequence_number = ((int64_t)request.sample_identity_.sequence_number().high) <<
request_header->request_id.writer_guid);
request_header->request_id.sequence_number =
((int64_t)request.sample_identity_.sequence_number().high) <<
32 | request.sample_identity_.sequence_number().low;

delete request.buffer_;
Expand Down
7 changes: 5 additions & 2 deletions rmw_fastrtps_shared_cpp/src/rmw_response.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ rmw_ret_t
__rmw_take_response(
const char * identifier,
const rmw_client_t * client,
rmw_request_id_t * request_header,
rmw_service_info_t * request_header,
void * ros_response,
bool * taken)
{
Expand Down Expand Up @@ -62,7 +62,10 @@ __rmw_take_response(
info->response_type_support_->deserializeROSmessage(
deser, ros_response, info->response_type_support_impl_);

request_header->sequence_number = ((int64_t)response.sample_identity_.sequence_number().high) <<
request_header->source_timestamp = response.sample_info_.sourceTimestamp.to_ns();
request_header->received_timestamp = response.sample_info_.receptionTimestamp.to_ns();
request_header->request_id.sequence_number =
((int64_t)response.sample_identity_.sequence_number().high) <<
32 | response.sample_identity_.sequence_number().low;

*taken = true;
Expand Down

0 comments on commit aad4762

Please sign in to comment.