Skip to content

Commit

Permalink
Revert "TCP deadlock on channel reuse (#4099)" (#4181)
Browse files Browse the repository at this point in the history
* Revert "TCP deadlock on channel reuse (#4099)"

This reverts commit dd4c434.

Signed-off-by: EduPonz <eduardoponz@eprosima.com>

* Refs #20055: Separate builtin transports tests into individual cases

Signed-off-by: EduPonz <eduardoponz@eprosima.com>

* Refs #20055: Mark large_data tests as flaky due to TCP

Signed-off-by: EduPonz <eduardoponz@eprosima.com>

---------

Signed-off-by: EduPonz <eduardoponz@eprosima.com>
(cherry picked from commit 5e87eb3)

# Conflicts:
#	test/blackbox/common/BlackboxTestsTransportCustom.cpp
#	test/blackbox/xfail_tests.cmake
  • Loading branch information
EduPonz authored and mergify[bot] committed Dec 22, 2023
1 parent ee4231e commit 3061995
Show file tree
Hide file tree
Showing 6 changed files with 349 additions and 56 deletions.
33 changes: 7 additions & 26 deletions src/cpp/rtps/transport/TCPTransportInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,21 +222,6 @@ void TCPTransportInterface::bind_socket(
auto it_remove = std::find(unbound_channel_resources_.begin(), unbound_channel_resources_.end(), channel);
assert(it_remove != unbound_channel_resources_.end());
unbound_channel_resources_.erase(it_remove);

unbound_lock.unlock();

// Look for an existing channel that matches this physical locator
auto existing_channel = channel_resources_.find(channel->locator());
// If the channel exists, check if the channel reference wait until it finishes its tasks
if (existing_channel != channel_resources_.end())
{
// Disconnect the old channel
existing_channel->second->disconnect();
scopedLock.unlock();
existing_channel->second->clear();
scopedLock.lock();
}

channel_resources_[channel->locator()] = channel;

}
Expand Down Expand Up @@ -656,6 +641,9 @@ bool TCPTransportInterface::OpenOutputChannel(
if (existing_channel != channel_resources_.end() &&
existing_channel->second != tcp_sender_resource->channel())
{
// Disconnect the old channel
tcp_sender_resource->channel()->disconnect();
tcp_sender_resource->channel()->clear();
// Update sender resource with new channel
tcp_sender_resource->channel() = existing_channel->second;
}
Expand Down Expand Up @@ -875,17 +863,10 @@ void TCPTransportInterface::perform_listen_operation(
{
TransportReceiverInterface* receiver = it->second.first;
ReceiverInUseCV* receiver_in_use = it->second.second;
receiver_in_use->cv.wait(scopedLock, [&]()
{
return receiver_in_use->in_use == false;
});
if (TCPChannelResource::eConnectionStatus::eConnecting < channel->connection_status())
{
receiver_in_use->in_use = true;
scopedLock.unlock();
receiver->OnDataReceived(msg.buffer, msg.length, channel->locator(), remote_locator);
scopedLock.lock();
}
receiver_in_use->in_use = true;
scopedLock.unlock();
receiver->OnDataReceived(msg.buffer, msg.length, channel->locator(), remote_locator);
scopedLock.lock();
receiver_in_use->in_use = false;
receiver_in_use->cv.notify_one();
}
Expand Down
315 changes: 311 additions & 4 deletions test/blackbox/common/BlackboxTestsTransportCustom.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@

#include "BlackboxTests.hpp"

#include "PubSubReader.hpp"
#include "PubSubWriter.hpp"
#include <string>

#include <gtest/gtest.h>

#include <fastdds/rtps/transport/ChainingTransportDescriptor.h>
#include <fastdds/rtps/transport/ChainingTransport.h>
#include <fastdds/rtps/attributes/PropertyPolicy.h>
#include <fastdds/rtps/transport/TCPv4TransportDescriptor.h>

#include <gtest/gtest.h>
#include "PubSubReader.hpp"
#include "PubSubWriter.hpp"

class TestChainingTransportDescriptor : public eprosima::fastdds::rtps::ChainingTransportDescriptor
{
Expand Down Expand Up @@ -112,6 +114,181 @@ eprosima::fastdds::rtps::TransportInterface* TestChainingTransportDescriptor::cr
return new TestChainingTransport(*this);
}

class BuiltinTransportsTest
{
public:

static void test_xml(
const std::string& profiles_file,
const std::string& participant_profile)
{
run_test(profiles_file, participant_profile, "", BuiltinTransports::NONE);
}

static void test_env(
const std::string& env_var_value)
{
if (env_var_value == "NONE")
{
#ifdef _WIN32
_putenv_s(env_var_name_.c_str(), env_var_value.c_str());
#else
setenv(env_var_name_.c_str(), env_var_value.c_str(), 1);
#endif // _WIN32

PubSubWriter<HelloWorldPubSubType> writer(TEST_TOPIC_NAME);
PubSubReader<HelloWorldPubSubType> reader(TEST_TOPIC_NAME);

writer.init();
ASSERT_FALSE(writer.isInitialized());

reader.init();
ASSERT_FALSE(reader.isInitialized());

}
else
{
run_test("", "", env_var_value, BuiltinTransports::NONE);
}
}

static void test_api(
const BuiltinTransports& builtin_transports)
{
if (builtin_transports == BuiltinTransports::NONE)
{
PubSubWriter<HelloWorldPubSubType> writer(TEST_TOPIC_NAME);
PubSubReader<HelloWorldPubSubType> reader(TEST_TOPIC_NAME);

writer.setup_transports(builtin_transports).init();
ASSERT_FALSE(writer.isInitialized());

reader.setup_transports(builtin_transports).init();
ASSERT_FALSE(reader.isInitialized());
}
else
{
run_test("", "", "", builtin_transports);
}
}

private:

static void run_test(
const std::string& profiles_file,
const std::string& participant_profile,
const std::string& env_var_value,
const BuiltinTransports& builtin_transports)
{
enum class BuiltinTransportsTestCase : uint8_t
{
NONE,
XML,
ENV,
API
};

BuiltinTransportsTestCase test_case = BuiltinTransportsTestCase::NONE;

/* Validate input */
if (profiles_file != "")
{
ASSERT_NE(participant_profile, "");
ASSERT_EQ(builtin_transports, BuiltinTransports::NONE);
ASSERT_EQ(env_var_value, "");
test_case = BuiltinTransportsTestCase::XML;
}
else if (env_var_value != "")
{
ASSERT_EQ(profiles_file, "");
ASSERT_EQ(participant_profile, "");
ASSERT_EQ(builtin_transports, BuiltinTransports::NONE);
test_case = BuiltinTransportsTestCase::ENV;
}
else if (builtin_transports != BuiltinTransports::NONE)
{
ASSERT_EQ(profiles_file, "");
ASSERT_EQ(participant_profile, "");
ASSERT_EQ(env_var_value, "");
test_case = BuiltinTransportsTestCase::API;
}

ASSERT_NE(test_case, BuiltinTransportsTestCase::NONE);

/* Test configuration */
PubSubWriter<HelloWorldPubSubType> writer(TEST_TOPIC_NAME);
PubSubReader<HelloWorldPubSubType> reader(TEST_TOPIC_NAME);

// Reliable keep all to wait of all acked as end condition
writer.reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS)
.history_kind(eprosima::fastrtps::KEEP_ALL_HISTORY_QOS);

reader.reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS)
.history_kind(eprosima::fastrtps::KEEP_ALL_HISTORY_QOS);

// Builtin transport configuration according to test_case
switch (test_case)
{
case BuiltinTransportsTestCase::XML:
{
writer.set_xml_filename(profiles_file);
writer.set_participant_profile(participant_profile);

reader.set_xml_filename(profiles_file);
reader.set_participant_profile(participant_profile);
break;
}
case BuiltinTransportsTestCase::ENV:
{
#ifdef _WIN32
_putenv_s(env_var_name_.c_str(), env_var_name_.c_str());
#else
setenv(env_var_name_.c_str(), env_var_name_.c_str(), 1);
#endif // _WIN32
break;
}
case BuiltinTransportsTestCase::API:
{
writer.setup_transports(builtin_transports);
reader.setup_transports(builtin_transports);
break;
}
default:
{
FAIL();
}
}

/* Run test */
// Init writer
writer.init();
ASSERT_TRUE(writer.isInitialized());

// Init reader
reader.init();
ASSERT_TRUE(reader.isInitialized());

// Wait for discovery
writer.wait_discovery();
reader.wait_discovery();

// Send data
auto data = default_helloworld_data_generator();
reader.startReception(data);
writer.send(data);
ASSERT_TRUE(data.empty());

// Wait for reception acknowledgement
reader.block_for_all();
EXPECT_TRUE(writer.waitForAllAcked(std::chrono::seconds(3)));
}

static const std::string env_var_name_;
};

// Static const member of non-integral types cannot be in-class initialized
const std::string BuiltinTransportsTest::env_var_name_ = "FASTDDS_BUILTIN_TRANSPORTS";

TEST(ChainingTransportTests, basic_test)
{
bool writer_init_function_called = false;
Expand Down Expand Up @@ -305,4 +482,134 @@ TEST(ChainingTransportTests, tcp_client_server_with_wan_correct_sender_resources
//! is being created
ASSERT_LE(times_writer_send_function_called.load(), 30);
ASSERT_LE(times_reader_receive_function_called.load(), 30);
}
<<<<<<< HEAD
}
=======
}

TEST(ChainingTransportTests, builtin_transports_api_none)
{
BuiltinTransportsTest::test_api(BuiltinTransports::NONE);
}

TEST(ChainingTransportTests, builtin_transports_api_default)
{
BuiltinTransportsTest::test_api(BuiltinTransports::DEFAULT);
}

TEST(ChainingTransportTests, builtin_transports_api_defaultv6)
{
BuiltinTransportsTest::test_api(BuiltinTransports::DEFAULTv6);
}

TEST(ChainingTransportTests, builtin_transports_api_shm)
{
BuiltinTransportsTest::test_api(BuiltinTransports::SHM);
}

TEST(ChainingTransportTests, builtin_transports_api_udpv4)
{
BuiltinTransportsTest::test_api(BuiltinTransports::UDPv4);
}

TEST(ChainingTransportTests, builtin_transports_api_udpv6)
{
BuiltinTransportsTest::test_api(BuiltinTransports::UDPv6);
}

TEST(ChainingTransportTests, builtin_transports_api_large_data)
{
BuiltinTransportsTest::test_api(BuiltinTransports::LARGE_DATA);
}

#ifndef __APPLE__
TEST(ChainingTransportTests, builtin_transports_api_large_datav6)
{
BuiltinTransportsTest::test_api(BuiltinTransports::LARGE_DATAv6);
}
#endif // __APPLE__

TEST(ChainingTransportTests, builtin_transports_env_none)
{
BuiltinTransportsTest::test_env("NONE");
}

TEST(ChainingTransportTests, builtin_transports_env_default)
{
BuiltinTransportsTest::test_env("DEFAULT");
}

TEST(ChainingTransportTests, builtin_transports_env_defaultv6)
{
BuiltinTransportsTest::test_env("DEFAULTv6");
}

TEST(ChainingTransportTests, builtin_transports_env_shm)
{
BuiltinTransportsTest::test_env("SHM");
}

TEST(ChainingTransportTests, builtin_transports_env_udpv4)
{
BuiltinTransportsTest::test_env("UDPv4");
}

TEST(ChainingTransportTests, builtin_transports_env_udpv6)
{
BuiltinTransportsTest::test_env("UDPv6");
}

TEST(ChainingTransportTests, builtin_transports_env_large_data)
{
BuiltinTransportsTest::test_env("LARGE_DATA");
}

#ifndef __APPLE__
TEST(ChainingTransportTests, builtin_transports_env_large_datav6)
{
BuiltinTransportsTest::test_env("LARGE_DATAv6");
}
#endif // __APPLE__

TEST(ChainingTransportTests, builtin_transports_xml_none)
{
BuiltinTransportsTest::test_xml("builtin_transports_profile.xml", "participant_none");
}

TEST(ChainingTransportTests, builtin_transports_xml_default)
{
BuiltinTransportsTest::test_xml("builtin_transports_profile.xml", "participant_default");
}

TEST(ChainingTransportTests, builtin_transports_xml_defaultv6)
{
BuiltinTransportsTest::test_xml("builtin_transports_profile.xml", "participant_defaultv6");
}

TEST(ChainingTransportTests, builtin_transports_xml_shm)
{
BuiltinTransportsTest::test_xml("builtin_transports_profile.xml", "participant_shm");
}

TEST(ChainingTransportTests, builtin_transports_xml_udpv4)
{
BuiltinTransportsTest::test_xml("builtin_transports_profile.xml", "participant_udp");
}

TEST(ChainingTransportTests, builtin_transports_xml_udpv6)
{
BuiltinTransportsTest::test_xml("builtin_transports_profile.xml", "participant_udpv6");
}

TEST(ChainingTransportTests, builtin_transports_xml_large_data)
{
BuiltinTransportsTest::test_xml("builtin_transports_profile.xml", "participant_largedata");
}

#ifndef __APPLE__
TEST(ChainingTransportTests, builtin_transports_xml_large_datav6)
{
BuiltinTransportsTest::test_xml("builtin_transports_profile.xml", "participant_largedatav6");
}
#endif // __APPLE__
>>>>>>> 5e87eb3ac (Revert "TCP deadlock on channel reuse (#4099)" (#4181))
Loading

0 comments on commit 3061995

Please sign in to comment.