diff --git a/include/fastrtps/transport/UDPTransportInterface.h b/include/fastrtps/transport/UDPTransportInterface.h index 88c19ac85de..78d896f3562 100644 --- a/include/fastrtps/transport/UDPTransportInterface.h +++ b/include/fastrtps/transport/UDPTransportInterface.h @@ -163,6 +163,8 @@ class UDPTransportInterface : public TransportInterface bool OpenAndBindInputSockets(const Locator_t& locator, TransportReceiverInterface* receiver, bool is_multicast, uint32_t maxMsgSize); + UDPChannelResource* CreateInputChannelResource(const std::string& sInterface, const Locator_t& locator, + bool is_multicast, uint32_t maxMsgSize, TransportReceiverInterface* receiver); virtual eProsimaUDPSocket OpenAndBindInputSocket(const std::string& sIp, uint16_t port, bool is_multicast) = 0; bool OpenAndBindOutputSockets(const Locator_t& locator); eProsimaUDPSocket OpenAndBindUnicastOutputSocket(const asio::ip::udp::endpoint& endpoint, uint16_t& port); diff --git a/src/cpp/transport/UDPTransportInterface.cpp b/src/cpp/transport/UDPTransportInterface.cpp index d89056c7e3d..fbaf208a0e2 100644 --- a/src/cpp/transport/UDPTransportInterface.cpp +++ b/src/cpp/transport/UDPTransportInterface.cpp @@ -210,13 +210,8 @@ bool UDPTransportInterface::OpenAndBindInputSockets(const Locator_t& locator, Tr std::vector vInterfaces = GetBindingInterfacesList(); for (std::string sInterface : vInterfaces) { - eProsimaUDPSocket unicastSocket = OpenAndBindInputSocket(sInterface, IPLocator::getPhysicalPort(locator), is_multicast); - UDPChannelResource* pChannelResource = new UDPChannelResource(unicastSocket, maxMsgSize); - pChannelResource->SetMessageReceiver(receiver); - pChannelResource->SetInterface(sInterface); - std::thread* newThread = new std::thread(&UDPTransportInterface::performListenOperation, this, - pChannelResource, locator); - pChannelResource->SetThread(newThread); + UDPChannelResource* pChannelResource; + pChannelResource = CreateInputChannelResource(sInterface, locator, is_multicast, maxMsgSize, receiver); mInputSockets[IPLocator::getPhysicalPort(locator)].push_back(pChannelResource); } } @@ -232,6 +227,19 @@ bool UDPTransportInterface::OpenAndBindInputSockets(const Locator_t& locator, Tr return true; } +UDPChannelResource* UDPTransportInterface::CreateInputChannelResource(const std::string& sInterface, const Locator_t& locator, + bool is_multicast, uint32_t maxMsgSize, TransportReceiverInterface* receiver) +{ + eProsimaUDPSocket unicastSocket = OpenAndBindInputSocket(sInterface, IPLocator::getPhysicalPort(locator), is_multicast); + UDPChannelResource* pChannelResource = new UDPChannelResource(unicastSocket, maxMsgSize); + pChannelResource->SetMessageReceiver(receiver); + pChannelResource->SetInterface(sInterface); + std::thread* newThread = new std::thread(&UDPTransportInterface::performListenOperation, this, + pChannelResource, locator); + pChannelResource->SetThread(newThread); + return pChannelResource; +} + bool UDPTransportInterface::OpenAndBindOutputSockets(const Locator_t& locator) { (void)locator; @@ -289,7 +297,7 @@ bool UDPTransportInterface::OpenAndBindOutputSockets(const Locator_t& locator) { eProsimaUDPSocket unicastSocket = OpenAndBindUnicastOutputSocket(GenerateEndpoint(infoIP.name, port), port); SetSocketOutbountInterface(unicastSocket, infoIP.name); - if (firstInterface) + if (!firstInterface) { getSocketPtr(unicastSocket)->set_option(ip::multicast::enable_loopback(true)); firstInterface = true; @@ -406,25 +414,42 @@ bool UDPTransportInterface::ReleaseInputChannel(const Locator_t& locator, UDPCha try { - Locator_t localLocator; - FillLocalIp(localLocator); - channel->Disable(); - ip::udp::socket socket(mService); - socket.open(GenerateProtocol()); - socket.bind(GenerateLocalEndpoint(localLocator, 0)); - uint16_t port = IPLocator::getPhysicalPort(locator); - // We first send directly to localhost, in case all network interfaces are disabled - // (which would mean that multicast traffic may not be sent) - auto localEndpoint = GenerateLocalEndpoint(localLocator, port); - socket.send_to(asio::buffer("EPRORTPSCLOSE", 13), localEndpoint); + if(IsInterfaceWhiteListEmpty()) + { + Locator_t localLocator; + FillLocalIp(localLocator); + + ip::udp::socket socket(mService); + socket.open(GenerateProtocol()); + socket.bind(GenerateLocalEndpoint(localLocator, 0)); + + // We first send directly to localhost, in case all network interfaces are disabled + // (which would mean that multicast traffic may not be sent) + auto localEndpoint = GenerateLocalEndpoint(localLocator, port); + socket.send_to(asio::buffer("EPRORTPSCLOSE", 13), localEndpoint); + + // We then send to the address of the input locator + auto destinationEndpoint = GenerateLocalEndpoint(locator, port); + socket.send_to(asio::buffer("EPRORTPSCLOSE", 13), destinationEndpoint); + } + else + { + auto interface_address = channel->getSocket()->local_endpoint().address(); + ip::udp::socket socket(mService); + socket.open(GenerateProtocol()); + socket.bind(asio::ip::udp::endpoint(interface_address, 0)); + + auto localEndpoint = ip::udp::endpoint(interface_address, port); + socket.send_to(asio::buffer("EPRORTPSCLOSE", 13), localEndpoint); - // We then send to the address of the input locator - auto destinationEndpoint = GenerateLocalEndpoint(locator, port); - socket.send_to(asio::buffer("EPRORTPSCLOSE", 13), destinationEndpoint); + // We then send to the address of the input locator + auto destinationEndpoint = GenerateLocalEndpoint(locator, port); + socket.send_to(asio::buffer("EPRORTPSCLOSE", 13), destinationEndpoint); + } } catch (const std::exception& error) { diff --git a/src/cpp/transport/UDPv4Transport.cpp b/src/cpp/transport/UDPv4Transport.cpp index 5a5a4f48489..01128f06340 100644 --- a/src/cpp/transport/UDPv4Transport.cpp +++ b/src/cpp/transport/UDPv4Transport.cpp @@ -273,42 +273,88 @@ bool UDPv4Transport::OpenInputChannel(const Locator_t& locator, TransportReceive if (IPLocator::isMulticast(locator) && IsInputChannelOpen(locator)) { - // The multicast group will be joined silently, because we do not - // want to return another resource. - auto& channelResources = mInputSockets.at(IPLocator::getPhysicalPort(locator)); - for (auto& channelResource : channelResources) + std::string locatorAddressStr = IPLocator::toIPv4string(locator); + ip::address_v4 locatorAddress = ip::address_v4::from_string(locatorAddressStr); + +#ifndef _WIN32 + if (!IsInterfaceWhiteListEmpty()) { - if (channelResource->GetInterface() == s_IPv4AddressAny) + // Either wildcard address or the multicast address needs to be bound on non-windows systems + bool found = false; + + // First check if the multicast address is already bound + auto& channelResources = mInputSockets.at(IPLocator::getPhysicalPort(locator)); + for (UDPChannelResource* channelResource : channelResources) { - std::vector locNames; - GetIP4sUniqueInterfaces(locNames, true); - for (const auto& infoIP : locNames) + if (channelResource->GetInterface() == locatorAddressStr) { - auto ip = asio::ip::address_v4::from_string(infoIP.name); - try - { - channelResource->getSocket()->set_option( - ip::multicast::join_group(ip::address_v4::from_string(IPLocator::toIPv4string(locator)), ip)); - } - catch (std::system_error& ex) + found = true; + break; + } + } + + // Create a new resource if no one is found + if (!found) + { + try + { + // Bind to multicast address + UDPChannelResource* pChannelResource; + pChannelResource = CreateInputChannelResource(locatorAddressStr, locator, true, maxMsgSize, receiver); + mInputSockets[IPLocator::getPhysicalPort(locator)].push_back(pChannelResource); + + // Join group on all whitelisted interfaces + for (auto& ip : mInterfaceWhiteList) { - (void)ex; - logWarning(RTPS_MSG_OUT, "Error joining multicast group on " << ip << ": " << ex.what()); + pChannelResource->getSocket()->set_option(ip::multicast::join_group(locatorAddress, ip)); } } + catch (asio::system_error const& e) + { + (void)e; + logWarning(RTPS_MSG_OUT, "UDPTransport Error binding " << locatorAddressStr << " at port: (" << IPLocator::getPhysicalPort(locator) << ")" + << " with msg: " << e.what()); + } } - else + } + else +#endif + { + // The multicast group will be joined silently, because we do not + // want to return another resource. + auto& channelResources = mInputSockets.at(IPLocator::getPhysicalPort(locator)); + for (UDPChannelResource* channelResource : channelResources) { - auto ip = asio::ip::address_v4::from_string(channelResource->GetInterface()); - try + if (channelResource->GetInterface() == s_IPv4AddressAny) { - channelResource->getSocket()->set_option( - ip::multicast::join_group(ip::address_v4::from_string(IPLocator::toIPv4string(locator)), ip)); + std::vector locNames; + GetIP4sUniqueInterfaces(locNames, true); + for (const auto& infoIP : locNames) + { + auto ip = asio::ip::address_v4::from_string(infoIP.name); + try + { + channelResource->getSocket()->set_option(ip::multicast::join_group(locatorAddress, ip)); + } + catch (std::system_error& ex) + { + (void)ex; + logWarning(RTPS_MSG_OUT, "Error joining multicast group on " << ip << ": " << ex.what()); + } + } } - catch (std::system_error& ex) + else { - (void)ex; - logWarning(RTPS_MSG_OUT, "Error joining multicast group on " << ip << ": " << ex.what()); + auto ip = asio::ip::address_v4::from_string(channelResource->GetInterface()); + try + { + channelResource->getSocket()->set_option(ip::multicast::join_group(locatorAddress, ip)); + } + catch (std::system_error& ex) + { + (void)ex; + logWarning(RTPS_MSG_OUT, "Error joining multicast group on " << ip << ": " << ex.what()); + } } } } diff --git a/test/blackbox/BlackboxTests.cpp b/test/blackbox/BlackboxTests.cpp index 46e6e2bf477..ae18bf7e094 100644 --- a/test/blackbox/BlackboxTests.cpp +++ b/test/blackbox/BlackboxTests.cpp @@ -12,23 +12,57 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "types/HelloWorld.h" -#include "types/StringType.h" -#include "types/Data64kbType.h" -#include "types/Data1mbType.h" -#include +#include "BlackboxTests.hpp" + +#include "RTPSAsSocketReader.hpp" +#include "RTPSAsSocketWriter.hpp" +#include "RTPSWithRegistrationReader.hpp" +#include "RTPSWithRegistrationWriter.hpp" +#include "ReqRepAsReliableHelloWorldRequester.hpp" +#include "ReqRepAsReliableHelloWorldReplier.hpp" +#include "TCPReqRepHelloWorldRequester.hpp" +#include "TCPReqRepHelloWorldReplier.hpp" +#include "PubSubReader.hpp" +#include "PubSubWriter.hpp" +#include "PubSubWriterReader.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include #include +#include +#include +#include + +using namespace eprosima::fastrtps; +using namespace eprosima::fastrtps::rtps; -using eprosima::fastrtps::rtps::IPLocator; +uint16_t global_port = 0; -/****** Auxiliary print functions ******/ -template -void default_receive_print(const Type&) +#if HAVE_SECURITY +static const char* certs_path = nullptr; +#endif + +uint16_t get_port() { - std::cout << "Received data" << std::endl; + uint16_t port = static_cast(GET_PID()); + + if(5000 > port) + { + port += 5000; + } + + std::cout << "Generating port " << port << std::endl; + return port; } +/****** Auxiliary print functions ******/ template<> void default_receive_print(const HelloWorld& hello) { @@ -54,12 +88,6 @@ void default_receive_print(const Data1mb& data) std::cout << "Received Data1mb " << (uint16_t)data.data()[0] << std::endl;; } -template -void default_send_print(const Type&) -{ - std::cout << "Sent data" << std::endl; -} - template<> void default_send_print(const StringType&) { @@ -91,94 +119,8 @@ void default_send_print(const Data1mb& data) std::cout << "Sent Data1mb " << (uint16_t)data.data()[0] << std::endl;; } -#include "RTPSAsSocketReader.hpp" -#include "RTPSAsSocketWriter.hpp" -#include "RTPSWithRegistrationReader.hpp" -#include "RTPSWithRegistrationWriter.hpp" -#include "ReqRepAsReliableHelloWorldRequester.hpp" -#include "ReqRepAsReliableHelloWorldReplier.hpp" -#include "TCPReqRepHelloWorldRequester.hpp" -#include "TCPReqRepHelloWorldReplier.hpp" -#include "PubSubReader.hpp" -#include "PubSubWriter.hpp" -#include "PubSubWriterReader.hpp" - -#include -#include -#include -#include -#include -#include -#include -#include - - -#include -#include -#include -#include -#include - -using namespace eprosima::fastrtps; -using namespace eprosima::fastrtps::rtps; - -#if defined(PREALLOCATED_WITH_REALLOC_MEMORY_MODE_TEST) -#define MEMORY_MODE_STRING ReallocMem -#define MEMORY_MODE_BYTE 1 -#elif defined(DYNAMIC_RESERVE_MEMORY_MODE_TEST) -#define MEMORY_MODE_STRING DynMem -#define MEMORY_MODE_BYTE 2 -#else -#define MEMORY_MODE_STRING PreallocMem -#define MEMORY_MODE_BYTE 3 -#endif - -#define PASTER(x, y) x ## _ ## y -#define EVALUATOR(x, y) PASTER(x, y) -#define BLACKBOXTEST(test_case_name, test_name) TEST(EVALUATOR(test_case_name, MEMORY_MODE_STRING), test_name) -#define BLACKBOXTEST_F(test_case_name, test_name) TEST_F(EVALUATOR(test_case_name, MEMORY_MODE_STRING), test_name) -#define TEST_TOPIC_NAME std::string(test_info_->test_case_name() + std::string("_") + test_info_->name()) - -uint16_t global_port = 0; - -#if HAVE_SECURITY -static const char* certs_path = nullptr; -#endif - -uint16_t get_port() -{ - uint16_t port = static_cast(GET_PID()); - - if(5000 > port) - { - port += 5000; - } - - std::cout << "Generating port " << port << std::endl; - return port; -} - -class BlackboxEnvironment : public ::testing::Environment -{ - public: - - void SetUp() - { - global_port = get_port(); - //Log::SetVerbosity(Log::Info); - //Log::SetCategoryFilter(std::regex("(SECURITY)")); - } - - void TearDown() - { - //Log::Reset(); - Log::KillThread(); - eprosima::fastrtps::rtps::RTPSDomain::stopAll(); - } -}; - /****** Auxiliary data generators *******/ -std::list default_helloworld_data_generator(size_t max = 0) +std::list default_helloworld_data_generator(size_t max) { uint16_t index = 1; size_t maximum = max ? max : 10; @@ -197,7 +139,7 @@ std::list default_helloworld_data_generator(size_t max = 0) return returnedValue; } -std::list default_large_string_data_generator(size_t max = 0) +std::list default_large_string_data_generator(size_t max) { uint16_t index = 1; size_t maximum = max ? max : 10; @@ -216,7 +158,7 @@ std::list default_large_string_data_generator(size_t max = 0) } const size_t data64kb_length = 63996; -std::list default_data64kb_data_generator(size_t max = 0) +std::list default_data64kb_data_generator(size_t max) { unsigned char index = 1; size_t maximum = max ? max : 10; @@ -238,7 +180,7 @@ std::list default_data64kb_data_generator(size_t max = 0) } const size_t data300kb_length = 307201; -std::list default_data300kb_data_generator(size_t max = 0) +std::list default_data300kb_data_generator(size_t max) { unsigned char index = 1; size_t maximum = max ? max : 10; @@ -259,7 +201,7 @@ std::list default_data300kb_data_generator(size_t max = 0) return returnedValue; } -std::list default_data300kb_mix_data_generator(size_t max = 0) +std::list default_data300kb_mix_data_generator(size_t max) { unsigned char index = 1; size_t maximum = max ? max : 10; @@ -302,18 +244,26 @@ const std::function default_data300kb_print = [](const Da { std::cout << (uint16_t)data.data()[0] << " "; }; +/***** End auxiliary lambda function *****/ - template -void print_non_received_messages(const std::list& data, const std::function& printer) +class BlackboxEnvironment : public ::testing::Environment { - if(data.size() != 0) - { - std::cout << "Samples not received: "; - std::for_each(data.begin(), data.end(), printer); - std::cout << std::endl; - } -} -/***** End auxiliary lambda function *****/ + public: + + void SetUp() + { + global_port = get_port(); + //Log::SetVerbosity(Log::Info); + //Log::SetCategoryFilter(std::regex("(SECURITY)")); + } + + void TearDown() + { + //Log::Reset(); + Log::KillThread(); + eprosima::fastrtps::rtps::RTPSDomain::stopAll(); + } +}; class EVALUATOR(BlackBoxPersistence, MEMORY_MODE_STRING) : public ::testing::Test { @@ -1463,7 +1413,7 @@ BLACKBOXTEST(BlackBox, PubSubAsNonReliableKeepLastReaderSmallDepth) size_t current_received = reader.block_for_at_least(2); reader.stopReception(); // Should be received only two samples. - ASSERT_EQ(current_received, 2); + ASSERT_EQ(current_received, 2u); data = reader.data_not_received(); } } @@ -1713,55 +1663,6 @@ BLACKBOXTEST(BlackBox, PubReliableKeepAllSubNonReliable) reader.block_for_at_least(2); } -//Verify that outLocatorList is used to select the desired output channel -BLACKBOXTEST(BlackBox, PubSubOutLocatorSelection){ - - PubSubReader reader(TEST_TOPIC_NAME); - PubSubWriter writer(TEST_TOPIC_NAME); - - LocatorList_t WriterOutLocators; - Locator_t LocatorBuffer; - - LocatorBuffer.kind = LOCATOR_KIND_UDPv4; - LocatorBuffer.port = 31337; - - WriterOutLocators.push_back(LocatorBuffer); - - - reader.reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS). - history_kind(eprosima::fastrtps::KEEP_ALL_HISTORY_QOS). - resource_limits_allocated_samples(2). - resource_limits_max_samples(2).init(); - - ASSERT_TRUE(reader.isInitialized()); - - std::shared_ptr descriptor = std::make_shared(); - descriptor->m_output_udp_socket = static_cast(LocatorBuffer.port); - - writer.reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS).history_kind(eprosima::fastrtps::KEEP_ALL_HISTORY_QOS). - durability_kind(eprosima::fastrtps::TRANSIENT_LOCAL_DURABILITY_QOS). - resource_limits_allocated_samples(20). - disable_builtin_transport(). - add_user_transport_to_pparams(descriptor). - resource_limits_max_samples(20).init(); - - - ASSERT_TRUE(writer.isInitialized()); - - // Because its volatile the durability - // Wait for discovery. - writer.wait_discovery(); - reader.wait_discovery(); - - auto data = default_helloworld_data_generator(); - - reader.startReception(data); - - writer.send(data); - ASSERT_TRUE(data.empty()); - reader.block_for_all(); -} - //Verify that Cachechanges are removed from History when the a Writer unmatches BLACKBOXTEST(BlackBox, StatefulReaderCacheChangeRelease){ PubSubReader reader(TEST_TOPIC_NAME); diff --git a/test/blackbox/BlackboxTests.hpp b/test/blackbox/BlackboxTests.hpp new file mode 100644 index 00000000000..ca82a77fa81 --- /dev/null +++ b/test/blackbox/BlackboxTests.hpp @@ -0,0 +1,125 @@ +// Copyright 2018 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef __BLACKBOX_BLACKBOXTESTS_HPP__ +#define __BLACKBOX_BLACKBOXTESTS_HPP__ + +#if defined(PREALLOCATED_WITH_REALLOC_MEMORY_MODE_TEST) +#define MEMORY_MODE_STRING ReallocMem +#define MEMORY_MODE_BYTE 1 +#elif defined(DYNAMIC_RESERVE_MEMORY_MODE_TEST) +#define MEMORY_MODE_STRING DynMem +#define MEMORY_MODE_BYTE 2 +#else +#define MEMORY_MODE_STRING PreallocMem +#define MEMORY_MODE_BYTE 3 +#endif + +#define PASTER(x, y) x ## _ ## y +#define EVALUATOR(x, y) PASTER(x, y) +#define BLACKBOXTEST(test_case_name, test_name) TEST(EVALUATOR(test_case_name, MEMORY_MODE_STRING), test_name) +#define BLACKBOXTEST_F(test_case_name, test_name) TEST_F(EVALUATOR(test_case_name, MEMORY_MODE_STRING), test_name) +#define TEST_TOPIC_NAME std::string(test_info_->test_case_name() + std::string("_") + test_info_->name()) + +#if defined(_WIN32) +#define GET_PID _getpid +#include +#else +#define GET_PID getpid +#include +#include +#endif + +#include "types/HelloWorldType.h" +#include "types/StringType.h" +#include "types/Data64kbType.h" +#include "types/Data1mbType.h" + +#include +#include +#include + +/****** Auxiliary print functions ******/ +template +void default_receive_print(const Type&) +{ + std::cout << "Received data" << std::endl; +} + +template<> +void default_receive_print(const HelloWorld& hello); + +template<> +void default_receive_print(const String& str); + +template<> +void default_receive_print(const Data64kb& data); + +template<> +void default_receive_print(const Data1mb& data); + +template +void default_send_print(const Type&) +{ + std::cout << "Sent data" << std::endl; +} + +template<> +void default_send_print(const StringType&); + +template<> +void default_send_print(const HelloWorld& hello); + +template<> +void default_send_print(const String& str); + +template<> +void default_send_print(const Data64kb& data); + +template<> +void default_send_print(const Data1mb& data); + +/****** Auxiliary data generators *******/ +std::list default_helloworld_data_generator(size_t max = 0); + +std::list default_large_string_data_generator(size_t max = 0); + +std::list default_data64kb_data_generator(size_t max = 0); + +std::list default_data300kb_data_generator(size_t max = 0); + +std::list default_data300kb_mix_data_generator(size_t max = 0); + +/****** Auxiliary lambda functions ******/ +extern const std::function default_helloworld_print; + +extern const std::function default_string_print; + +extern const std::function default_data64kb_print; + +extern const std::function default_data300kb_print; + +template +void print_non_received_messages(const std::list& data, const std::function& printer) +{ + if(data.size() != 0) + { + std::cout << "Samples not received: "; + std::for_each(data.begin(), data.end(), printer); + std::cout << std::endl; + } +} +/***** End auxiliary lambda function *****/ + +#endif // __BLACKBOX_BLACKBOXTESTS_HPP__ diff --git a/test/blackbox/CMakeLists.txt b/test/blackbox/CMakeLists.txt index 7071e0b314f..1884a1f89df 100644 --- a/test/blackbox/CMakeLists.txt +++ b/test/blackbox/CMakeLists.txt @@ -97,7 +97,10 @@ if(NOT ((MSVC OR MSVC_IDE) AND EPROSIMA_INSTALLER) AND fastcdr_FOUND) ############################################################################### # Unit tests ############################################################################### - set(BLACKBOXTESTS_SOURCE BlackboxTests.cpp + set(BLACKBOXTEST_TEST_SOURCE BlackboxTests.cpp + NetworkConf.cpp + ) + set(BLACKBOXTESTS_SOURCE ${BLACKBOXTEST_TEST_SOURCE} types/HelloWorld.cpp types/HelloWorldType.cpp types/String.cpp @@ -132,7 +135,7 @@ if(NOT ((MSVC OR MSVC_IDE) AND EPROSIMA_INSTALLER) AND fastcdr_FOUND) PREALLOCATED_MEMORY_MODE_TEST) target_include_directories(BlackboxTests_PreallocMem PRIVATE ${GTEST_INCLUDE_DIRS}) target_link_libraries(BlackboxTests_PreallocMem fastrtps fastcdr ${GTEST_LIBRARIES}) - add_blackbox_gtest(BlackboxTests_PreallocMem PreallocMem SOURCES ${BLACKBOXTESTS_SOURCE} + add_blackbox_gtest(BlackboxTests_PreallocMem PreallocMem SOURCES ${BLACKBOXTESTS_TEST_SOURCE} ENVIRONMENTS "CERTS_PATH=${PROJECT_SOURCE_DIR}/test/certs" "TOPIC_RANDOM_NUMBER=${TOPIC_RANDOM_NUMBER}" "W_UNICAST_PORT_RANDOM_NUMBER=${W_UNICAST_PORT_RANDOM_NUMBER}" @@ -145,7 +148,7 @@ if(NOT ((MSVC OR MSVC_IDE) AND EPROSIMA_INSTALLER) AND fastcdr_FOUND) PREALLOCATED_WITH_REALLOC_MEMORY_MODE_TEST) target_include_directories(BlackboxTests_ReallocMem PRIVATE ${GTEST_INCLUDE_DIRS}) target_link_libraries(BlackboxTests_ReallocMem fastrtps fastcdr ${GTEST_LIBRARIES}) - add_blackbox_gtest(BlackboxTests_ReallocMem ReallocMem SOURCES ${BLACKBOXTESTS_SOURCE} + add_blackbox_gtest(BlackboxTests_ReallocMem ReallocMem SOURCES ${BLACKBOXTESTS_TEST_SOURCE} ENVIRONMENTS "CERTS_PATH=${PROJECT_SOURCE_DIR}/test/certs" "TOPIC_RANDOM_NUMBER=${TOPIC_RANDOM_NUMBER}" "W_UNICAST_PORT_RANDOM_NUMBER=${W_UNICAST_PORT_RANDOM_NUMBER}" @@ -158,7 +161,7 @@ if(NOT ((MSVC OR MSVC_IDE) AND EPROSIMA_INSTALLER) AND fastcdr_FOUND) DYNAMIC_RESERVE_MEMORY_MODE_TEST) target_include_directories(BlackboxTests_DynMem PRIVATE ${GTEST_INCLUDE_DIRS}) target_link_libraries(BlackboxTests_DynMem fastrtps fastcdr ${GTEST_LIBRARIES}) - add_blackbox_gtest(BlackboxTests_DynMem DynMem SOURCES ${BLACKBOXTESTS_SOURCE} + add_blackbox_gtest(BlackboxTests_DynMem DynMem SOURCES ${BLACKBOXTESTS_TEST_SOURCE} ENVIRONMENTS "CERTS_PATH=${PROJECT_SOURCE_DIR}/test/certs" "TOPIC_RANDOM_NUMBER=${TOPIC_RANDOM_NUMBER}" "W_UNICAST_PORT_RANDOM_NUMBER=${W_UNICAST_PORT_RANDOM_NUMBER}" diff --git a/test/blackbox/NetworkConf.cpp b/test/blackbox/NetworkConf.cpp new file mode 100644 index 00000000000..d5a6d8129d1 --- /dev/null +++ b/test/blackbox/NetworkConf.cpp @@ -0,0 +1,164 @@ +// Copyright 2018 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "BlackboxTests.hpp" +#include "PubSubReader.hpp" +#include "PubSubWriter.hpp" + +#include +#include +#include + +#include + +using namespace eprosima::fastrtps; +using namespace eprosima::fastrtps::rtps; + +static void GetIP4s(std::vector& interfaces) +{ + IPFinder::getIPs(&interfaces, false); + auto new_end = remove_if(interfaces.begin(), + interfaces.end(), + [](IPFinder::info_IP ip){return ip.type != IPFinder::IP4 && ip.type != IPFinder::IP4_LOCAL;}); + interfaces.erase(new_end, interfaces.end()); + std::for_each(interfaces.begin(), interfaces.end(), [](auto&& loc) + { + loc.locator.kind = LOCATOR_KIND_UDPv4; + }); +} + +//Verify that outLocatorList is used to select the desired output channel +BLACKBOXTEST(BlackBox, PubSubOutLocatorSelection) +{ + PubSubReader reader(TEST_TOPIC_NAME); + PubSubWriter writer(TEST_TOPIC_NAME); + + LocatorList_t WriterOutLocators; + Locator_t LocatorBuffer; + + LocatorBuffer.kind = LOCATOR_KIND_UDPv4; + LocatorBuffer.port = 31337; + + WriterOutLocators.push_back(LocatorBuffer); + + + reader.reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS). + history_kind(eprosima::fastrtps::KEEP_ALL_HISTORY_QOS). + resource_limits_allocated_samples(2). + resource_limits_max_samples(2).init(); + + ASSERT_TRUE(reader.isInitialized()); + + std::shared_ptr descriptor = std::make_shared(); + descriptor->m_output_udp_socket = static_cast(LocatorBuffer.port); + + writer.reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS).history_kind(eprosima::fastrtps::KEEP_ALL_HISTORY_QOS). + durability_kind(eprosima::fastrtps::TRANSIENT_LOCAL_DURABILITY_QOS). + resource_limits_allocated_samples(20). + disable_builtin_transport(). + add_user_transport_to_pparams(descriptor). + resource_limits_max_samples(20).init(); + + + ASSERT_TRUE(writer.isInitialized()); + + // Because its volatile the durability + // Wait for discovery. + writer.wait_discovery(); + reader.wait_discovery(); + + auto data = default_helloworld_data_generator(); + + reader.startReception(data); + + writer.send(data); + ASSERT_TRUE(data.empty()); + reader.block_for_all(); +} + +BLACKBOXTEST(BlackBox, PubSubInterfaceWhitelistLocalhost) +{ + PubSubReader reader(TEST_TOPIC_NAME); + PubSubWriter writer(TEST_TOPIC_NAME); + + std::shared_ptr descriptor = std::make_shared(); + descriptor->interfaceWhiteList.push_back("127.0.0.1"); + + reader.reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS).history_depth(10). + disable_multicast(0). + disable_builtin_transport(). + add_user_transport_to_pparams(descriptor).init(); + + ASSERT_TRUE(reader.isInitialized()); + + writer.reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS).history_depth(10). + disable_multicast(1). + disable_builtin_transport(). + add_user_transport_to_pparams(descriptor).init(); + + ASSERT_TRUE(writer.isInitialized()); + + // Because its volatile the durability + // Wait for discovery. + writer.wait_discovery(); + reader.wait_discovery(); + + auto data = default_helloworld_data_generator(); + + reader.startReception(data); + + writer.send(data); + ASSERT_TRUE(data.empty()); + reader.block_for_all(); +} + +BLACKBOXTEST(BlackBox, PubSubInterfaceWhitelistUnicast) +{ + PubSubReader reader(TEST_TOPIC_NAME); + PubSubWriter writer(TEST_TOPIC_NAME); + + std::vector interfaces; + GetIP4s(interfaces); + + std::shared_ptr descriptor = std::make_shared(); + for(const auto& interface : interfaces) + { + descriptor->interfaceWhiteList.push_back(interface.name); + } + + reader.reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS).history_depth(10). + disable_builtin_transport(). + add_user_transport_to_pparams(descriptor).init(); + + ASSERT_TRUE(reader.isInitialized()); + + writer.reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS).history_depth(10). + disable_builtin_transport(). + add_user_transport_to_pparams(descriptor).init(); + + ASSERT_TRUE(writer.isInitialized()); + + // Because its volatile the durability + // Wait for discovery. + writer.wait_discovery(); + reader.wait_discovery(); + + auto data = default_helloworld_data_generator(); + + reader.startReception(data); + + writer.send(data); + ASSERT_TRUE(data.empty()); + reader.block_for_all(); +} diff --git a/test/unittest/transport/UDPv4Tests.cpp b/test/unittest/transport/UDPv4Tests.cpp index 98e5f1bf492..1da2c3419ae 100644 --- a/test/unittest/transport/UDPv4Tests.cpp +++ b/test/unittest/transport/UDPv4Tests.cpp @@ -52,6 +52,20 @@ uint16_t get_port() return port; } +static void GetIP4s(std::vector& interfaces) +{ + IPFinder::getIPs(&interfaces, false); + auto new_end = remove_if(interfaces.begin(), + interfaces.end(), + [](IPFinder::info_IP ip){return ip.type != IPFinder::IP4 && ip.type != IPFinder::IP4_LOCAL;}); + interfaces.erase(new_end, interfaces.end()); + std::for_each(interfaces.begin(), interfaces.end(), [](auto&& loc) + { + loc.locator.kind = LOCATOR_KIND_UDPv4; + }); +} + + class UDPv4Tests: public ::testing::Test { public: @@ -355,27 +369,28 @@ TEST_F(UDPv4Tests, send_to_allowed_interface) } } #ifndef __APPLE__ -TEST_F(UDPv4Tests, send_and_receive_between_allowed_sockets) + +TEST_F(UDPv4Tests, send_and_receive_between_allowed_sockets_using_localhost) { descriptor.interfaceWhiteList.emplace_back("127.0.0.1"); UDPv4Transport transportUnderTest(descriptor); transportUnderTest.init(); - Locator_t multicastLocator; - multicastLocator.port = g_default_port; - multicastLocator.kind = LOCATOR_KIND_UDPv4; - IPLocator::setIPv4(multicastLocator, "127.0.0.1"); + Locator_t unicastLocator; + unicastLocator.port = g_default_port; + unicastLocator.kind = LOCATOR_KIND_UDPv4; + IPLocator::setIPv4(unicastLocator, "127.0.0.1"); Locator_t outputChannelLocator; - outputChannelLocator.port = g_default_port; + outputChannelLocator.port = g_default_port + 1; outputChannelLocator.kind = LOCATOR_KIND_UDPv4; - IPLocator::setIPv4(outputChannelLocator, 239, 255, 1, 4); + IPLocator::setIPv4(outputChannelLocator, "127.0.0.1"); - MockReceiverResource receiver(transportUnderTest, multicastLocator); + MockReceiverResource receiver(transportUnderTest, unicastLocator); MockMessageReceiver *msg_recv = dynamic_cast(receiver.CreateMessageReceiver()); ASSERT_TRUE(transportUnderTest.OpenOutputChannel(outputChannelLocator)); // Includes loopback - ASSERT_TRUE(transportUnderTest.IsInputChannelOpen(multicastLocator)); + ASSERT_TRUE(transportUnderTest.IsInputChannelOpen(unicastLocator)); octet message[5] = { 'H','e','l','l','o' }; Semaphore sem; @@ -389,7 +404,57 @@ TEST_F(UDPv4Tests, send_and_receive_between_allowed_sockets) auto sendThreadFunction = [&]() { - EXPECT_TRUE(transportUnderTest.Send(message, 5, outputChannelLocator, multicastLocator)); + EXPECT_TRUE(transportUnderTest.Send(message, 5, outputChannelLocator, unicastLocator)); + }; + + senderThread.reset(new std::thread(sendThreadFunction)); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + senderThread->join(); + sem.wait(); + ASSERT_TRUE(transportUnderTest.CloseOutputChannel(outputChannelLocator)); +} + +TEST_F(UDPv4Tests, send_and_receive_between_allowed_sockets_using_unicast) +{ + std::vector interfaces; + GetIP4s(interfaces); + + for(const auto& interface : interfaces) + { + descriptor.interfaceWhiteList.push_back(interface.name); + } + UDPv4Transport transportUnderTest(descriptor); + transportUnderTest.init(); + + Locator_t unicastLocator; + unicastLocator.port = g_default_port; + unicastLocator.kind = LOCATOR_KIND_UDPv4; + IPLocator::setIPv4(unicastLocator, interfaces.at(0).name); + + Locator_t outputChannelLocator; + outputChannelLocator.port = g_default_port + 1; + outputChannelLocator.kind = LOCATOR_KIND_UDPv4; + IPLocator::setIPv4(outputChannelLocator, interfaces.at(0).name); + + MockReceiverResource receiver(transportUnderTest, unicastLocator); + MockMessageReceiver *msg_recv = dynamic_cast(receiver.CreateMessageReceiver()); + + ASSERT_TRUE(transportUnderTest.OpenOutputChannel(outputChannelLocator)); // Includes loopback + ASSERT_TRUE(transportUnderTest.IsInputChannelOpen(unicastLocator)); + octet message[5] = { 'H','e','l','l','o' }; + + Semaphore sem; + std::function recCallback = [&]() + { + EXPECT_EQ(memcmp(message, msg_recv->data, 5), 0); + sem.post(); + }; + + msg_recv->setCallback(recCallback); + + auto sendThreadFunction = [&]() + { + EXPECT_TRUE(transportUnderTest.Send(message, 5, outputChannelLocator, unicastLocator)); }; senderThread.reset(new std::thread(sendThreadFunction)); @@ -399,6 +464,57 @@ TEST_F(UDPv4Tests, send_and_receive_between_allowed_sockets) ASSERT_TRUE(transportUnderTest.CloseOutputChannel(outputChannelLocator)); } +TEST_F(UDPv4Tests, send_and_receive_between_allowed_sockets_using_unicast_to_multicast) +{ + std::vector interfaces; + GetIP4s(interfaces); + + for(const auto& interface : interfaces) + { + descriptor.interfaceWhiteList.push_back(interface.name); + } + UDPv4Transport transportUnderTest(descriptor); + transportUnderTest.init(); + + Locator_t unicastLocator; + unicastLocator.port = g_default_port; + unicastLocator.kind = LOCATOR_KIND_UDPv4; + IPLocator::setIPv4(unicastLocator, "239.255.1.4"); + + Locator_t outputChannelLocator; + outputChannelLocator.port = g_default_port + 1; + outputChannelLocator.kind = LOCATOR_KIND_UDPv4; + IPLocator::setIPv4(outputChannelLocator, interfaces.at(0).name); + + MockReceiverResource receiver(transportUnderTest, unicastLocator); + MockMessageReceiver *msg_recv = dynamic_cast(receiver.CreateMessageReceiver()); + + ASSERT_TRUE(transportUnderTest.OpenOutputChannel(outputChannelLocator)); // Includes loopback + ASSERT_TRUE(transportUnderTest.IsInputChannelOpen(unicastLocator)); + octet message[5] = { 'H','e','l','l','o' }; + + Semaphore sem; + std::function recCallback = [&]() + { + EXPECT_EQ(memcmp(message, msg_recv->data, 5), 0); + sem.post(); + }; + + msg_recv->setCallback(recCallback); + + auto sendThreadFunction = [&]() + { + EXPECT_TRUE(transportUnderTest.Send(message, 5, outputChannelLocator, unicastLocator)); + }; + + senderThread.reset(new std::thread(sendThreadFunction)); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + senderThread->join(); + sem.wait(); + ASSERT_TRUE(transportUnderTest.CloseOutputChannel(outputChannelLocator)); +} +#endif + TEST_F(UDPv4Tests, open_a_blocked_socket) { descriptor.interfaceWhiteList.emplace_back("111.111.111.111"); @@ -413,7 +529,6 @@ TEST_F(UDPv4Tests, open_a_blocked_socket) MockReceiverResource receiver(transportUnderTest, multicastLocator); ASSERT_FALSE(transportUnderTest.IsInputChannelOpen(multicastLocator)); } -#endif TEST_F(UDPv4Tests, shrink_locator_lists) {