Skip to content

Commit

Permalink
Advanced Configuration Example with partitions and ownership (#2947)
Browse files Browse the repository at this point in the history
* Implement Advance Configuration Example with partitions

Signed-off-by: jparisu <javierparis@eprosima.com>

* Implement ownership

Signed-off-by: jparisu <javierparis@eprosima.com>

* Add KeepAll History to reliable

Signed-off-by: jparisu <javierparis@eprosima.com>

* Apply suggestions

Signed-off-by: Juan López Fernández <juanlopez@eprosima.com>

* Uncrustify

Signed-off-by: Juan López Fernández <juanlopez@eprosima.com>

* Add guid information to stdout logs

Signed-off-by: jparisu <javierparis@eprosima.com>

Signed-off-by: jparisu <javierparis@eprosima.com>
Signed-off-by: Juan López Fernández <juanlopez@eprosima.com>
Co-authored-by: Juan López Fernández <juanlopez@eprosima.com>
  • Loading branch information
jparisu and juanlofer-eprosima authored Sep 27, 2022
1 parent 5318ccb commit 64797cd
Show file tree
Hide file tree
Showing 15 changed files with 2,388 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,357 @@
// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
* @file AdvancedConfigurationPublisher.cpp
*
*/

#include <csignal>
#include <thread>
#include <sstream>

#include <fastdds/dds/domain/DomainParticipantFactory.hpp>
#include <fastdds/dds/publisher/DataWriter.hpp>
#include <fastdds/dds/publisher/Publisher.hpp>
#include <fastdds/dds/publisher/qos/DataWriterQos.hpp>
#include <fastdds/dds/publisher/qos/PublisherQos.hpp>
#include <fastdds/rtps/transport/shared_mem/SharedMemTransportDescriptor.h>
#include <fastdds/rtps/transport/UDPv4TransportDescriptor.h>
#include <fastdds/rtps/transport/UDPv6TransportDescriptor.h>
#include <fastrtps/attributes/ParticipantAttributes.h>
#include <fastrtps/attributes/PublisherAttributes.h>

#include "AdvancedConfigurationPublisher.h"

using namespace eprosima::fastdds::dds;
using namespace eprosima::fastdds::rtps;

std::atomic<bool> HelloWorldPublisher::stop_(false);
std::mutex HelloWorldPublisher::PubListener::wait_matched_cv_mtx_;
std::condition_variable HelloWorldPublisher::PubListener::wait_matched_cv_;

HelloWorldPublisher::HelloWorldPublisher()
: participant_(nullptr)
, publisher_(nullptr)
, topic_(nullptr)
, writer_(nullptr)
, type_(new HelloWorldPubSubType())
{
}

bool HelloWorldPublisher::is_stopped()
{
return stop_;
}

void HelloWorldPublisher::stop()
{
stop_ = true;
PubListener::awake();
}

bool HelloWorldPublisher::init(
const std::string& topic_name,
uint32_t domain,
uint32_t num_wait_matched,
bool async,
TransportType transport,
bool reliable,
bool transient,
int hops,
const std::string& partitions,
bool use_ownership,
unsigned int ownership_strength /* = 0 */)
{
hello_.index(0);
memcpy(hello_.message().data(), "HelloWorld ", strlen("HelloWorld") + 1);

DomainParticipantQos pqos;
pqos.name("Participant_pub");
listener_.set_num_wait_matched(num_wait_matched);

// TRANSPORT CONFIG
// If it is set, not use default and set the transport
if (transport != DEFAULT || hops > 0 )
{
pqos.transport().use_builtin_transports = false;

switch ( transport )
{
case SHM:
{
auto shm_transport = std::make_shared<SharedMemTransportDescriptor>();
pqos.transport().user_transports.push_back(shm_transport);
}
break;
case UDPv4:
{
auto udp_transport = std::make_shared<UDPv4TransportDescriptor>();
pqos.transport().user_transports.push_back(udp_transport);
}
break;
case UDPv6:
{
auto udp_transport = std::make_shared<UDPv6TransportDescriptor>();
pqos.transport().user_transports.push_back(udp_transport);
}
break;
case DEFAULT:
default:
{
// mimick default transport selection
auto udp_transport = std::make_shared<UDPv4TransportDescriptor>();
pqos.transport().user_transports.push_back(udp_transport);
#ifdef SHM_TRANSPORT_BUILTIN
auto shm_transport = std::make_shared<SharedMemTransportDescriptor>();
pqos.transport().user_transports.push_back(shm_transport);
#endif // SHM_TRANSPORT_BUILTIN
}
}

if ( hops > 0 )
{
for (auto& transportDescriptor : pqos.transport().user_transports)
{
SocketTransportDescriptor* pT = dynamic_cast<SocketTransportDescriptor*>(transportDescriptor.get());
if (pT)
{
pT->TTL = (uint8_t)std::min(hops, 255);
}
}
}
}

// CREATE THE PARTICIPANT
participant_ = DomainParticipantFactory::get_instance()->create_participant(domain, pqos);

if (participant_ == nullptr)
{
return false;
}

// REGISTER THE TYPE
type_.register_type(participant_);

// CREATE THE PUBLISHER
PublisherQos pbqos;

if (!partitions.empty())
{
// Divide in partitions by ;
std::stringstream spartitions(partitions);
std::string partition_cut;
while (std::getline(spartitions, partition_cut, ';'))
{
pbqos.partition().push_back(partition_cut.c_str());
}
}

// Create publisher
publisher_ = participant_->create_publisher(pbqos, nullptr);

if (publisher_ == nullptr)
{
return false;
}

// CREATE THE TOPIC
topic_ = participant_->create_topic(topic_name, "HelloWorld", TOPIC_QOS_DEFAULT);

if (topic_ == nullptr)
{
return false;
}

// CREATE THE WRITER
DataWriterQos wqos = DATAWRITER_QOS_DEFAULT;

// Data sharing set in endpoint. If it is not default, set it to off
if (transport != DEFAULT)
{
wqos.data_sharing().off();
}
else
{
wqos.data_sharing().automatic(); // default
}

if (async)
{
wqos.publish_mode().kind = ASYNCHRONOUS_PUBLISH_MODE;
}
else
{
wqos.publish_mode().kind = SYNCHRONOUS_PUBLISH_MODE; // default
}

if (reliable)
{
wqos.reliability().kind = RELIABLE_RELIABILITY_QOS;
wqos.history().kind = KEEP_ALL_HISTORY_QOS;
}
else
{
wqos.reliability().kind = BEST_EFFORT_RELIABILITY_QOS; // default in this example (although default value for
// writters' qos actually is RELIABLE)
}

if (transient)
{
wqos.durability().kind = TRANSIENT_LOCAL_DURABILITY_QOS;
wqos.history().kind = KEEP_ALL_HISTORY_QOS; // store previously sent samples so they can be resent to newly
// matched DataReaders
}
else
{
wqos.durability().kind = VOLATILE_DURABILITY_QOS; // default in this example (although default value for
// writters' qos actually is TRANSIENT_LOCAL)
}

// Set ownership
if (use_ownership)
{
wqos.ownership().kind = OwnershipQosPolicyKind::EXCLUSIVE_OWNERSHIP_QOS;
wqos.ownership_strength().value = ownership_strength;
}

writer_ = publisher_->create_datawriter(topic_, wqos, &listener_);

if (writer_ == nullptr)
{
return false;
}

std::cout << "Publisher Participant created with DataWriter Guid [ " << writer_->guid() << " ]." << std::endl;
return true;
}

HelloWorldPublisher::~HelloWorldPublisher()
{
if (participant_ != nullptr)
{
if (publisher_ != nullptr)
{
if (writer_ != nullptr)
{
publisher_->delete_datawriter(writer_);
}
participant_->delete_publisher(publisher_);
}
if (topic_ != nullptr)
{
participant_->delete_topic(topic_);
}
DomainParticipantFactory::get_instance()->delete_participant(participant_);
}
}

void HelloWorldPublisher::PubListener::on_publication_matched(
eprosima::fastdds::dds::DataWriter*,
const eprosima::fastdds::dds::PublicationMatchedStatus& info)
{
if (info.current_count_change == 1)
{
matched_ = info.current_count;
std::cout << "Publisher matched [ " << iHandle2GUID(info.last_subscription_handle) << " ]." << std::endl;
if (enough_matched())
{
awake();
}
}
else if (info.current_count_change == -1)
{
matched_ = info.current_count;
std::cout << "Publisher unmatched [ " << iHandle2GUID(info.last_subscription_handle) << " ]." << std::endl;
}
else
{
std::cout << info.current_count_change
<< " is not a valid value for PublicationMatchedStatus current count change" << std::endl;
}
}

void HelloWorldPublisher::PubListener::set_num_wait_matched(
uint32_t num_wait_matched)
{
num_wait_matched_ = num_wait_matched;
}

bool HelloWorldPublisher::PubListener::enough_matched()
{
return matched_ >= num_wait_matched_;
}

void HelloWorldPublisher::PubListener::wait()
{
std::unique_lock<std::mutex> lck(wait_matched_cv_mtx_);
wait_matched_cv_.wait(lck, [this]
{
return enough_matched() || is_stopped();
});
}

void HelloWorldPublisher::PubListener::awake()
{
wait_matched_cv_.notify_all();
}

void HelloWorldPublisher::runThread(
uint32_t samples,
uint32_t sleep)
{
while (!is_stopped() && (samples == 0 || hello_.index() < samples))
{
if (listener_.enough_matched())
{
publish();
std::cout << "Message: " << hello_.message().data() << " with index: " << hello_.index()
<< " SENT" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(sleep));
}
else
{
listener_.wait();
}
}
}

void HelloWorldPublisher::run(
uint32_t samples,
uint32_t sleep)
{
stop_ = false;
std::thread thread(&HelloWorldPublisher::runThread, this, samples, sleep);
if (samples == 0)
{
std::cout << "Publisher running. Please press CTRL+C to stop the Publisher at any time." << std::endl;
}
else
{
std::cout << "Publisher running " << samples <<
" samples. Please press CTRL+C to stop the Publisher at any time." << std::endl;
}
signal(SIGINT, [](int signum)
{
std::cout << "SIGINT received, stopping Publisher execution." << std::endl;
static_cast<void>(signum); HelloWorldPublisher::stop();
});
thread.join();
}

void HelloWorldPublisher::publish()
{
hello_.index(hello_.index() + 1);
writer_->write(&hello_);
}
Loading

0 comments on commit 64797cd

Please sign in to comment.