diff --git a/include/envoy/event/dispatcher.h b/include/envoy/event/dispatcher.h index 9802ce3c64e8..9dff30534136 100644 --- a/include/envoy/event/dispatcher.h +++ b/include/envoy/event/dispatcher.h @@ -110,6 +110,14 @@ class Dispatcher { Network::ListenerCallbacks& cb, bool bind_to_port, bool hand_off_restored_destination_connections) PURE; + /** + * Create a logical udp listener on a specific port. + * @param socket supplies the socket to listen on. + * @param cb supplies the udp listener callbacks to invoke for listener events. + * @return Network::ListenerPtr a new listener that is owned by the caller. + */ + virtual Network::ListenerPtr createUdpListener(Network::Socket& socket, + Network::UdpListenerCallbacks& cb) PURE; /** * Allocate a timer. @see Timer for docs on how to use the timer. * @param cb supplies the callback to invoke when the timer fires. diff --git a/include/envoy/network/listener.h b/include/envoy/network/listener.h index ae34c3151117..06a9d7cc2263 100644 --- a/include/envoy/network/listener.h +++ b/include/envoy/network/listener.h @@ -114,6 +114,59 @@ class ListenerCallbacks { virtual void onNewConnection(ConnectionPtr&& new_connection) PURE; }; +/** + * Utility struct that encapsulates the information from a udp socket's + * recvfrom/recvmmsg call. + * + * TODO(conqerAtapple): Maybe this belongs inside the UdpListenerCallbacks + * class. + */ +struct UdpData { + Address::InstanceConstSharedPtr local_address_; + Address::InstanceConstSharedPtr peer_address_; // TODO(conquerAtapple): Fix ownership semantics. + Buffer::InstancePtr buffer_; + // TODO(conquerAtapple): + // Add UdpReader here so that the callback handler can + // then use the reader to do multiple reads(recvmmsg) once the OS notifies it + // has data. We could also just return a `ReaderFactory` that returns either a + // `recvfrom` reader (with peer information) or a `read/recvmmsg` reader. This + // is still being flushed out (Jan, 2019). +}; + +/** + * Udp listener callbacks. + */ +class UdpListenerCallbacks { +public: + enum class ErrorCode { SyscallError, UnknownError }; + + virtual ~UdpListenerCallbacks() = default; + + /** + * Called whenever data is received by the underlying udp socket. + * + * @param data UdpData from the underlying socket. + */ + virtual void onData(const UdpData& data) PURE; + + /** + * Called when the underlying socket is ready for write. + * + * @param socket Underlying server socket for the listener. + * + * TODO(conqerAtapple): Maybe we need a UdpWriter here instead of Socket. + */ + virtual void onWriteReady(const Socket& socket) PURE; + + /** + * Called when there is an error event. + * + * @param error_code ErrorCode for the error event. + * @param error_number System error number. + */ + virtual void onError(const ErrorCode& error_code, int error_number) PURE; +}; + /** * An abstract socket listener. Free the listener to stop listening on the socket. */ diff --git a/source/common/event/dispatcher_impl.cc b/source/common/event/dispatcher_impl.cc index be1134964e91..ced376e638dc 100644 --- a/source/common/event/dispatcher_impl.cc +++ b/source/common/event/dispatcher_impl.cc @@ -19,6 +19,7 @@ #include "common/network/connection_impl.h" #include "common/network/dns_impl.h" #include "common/network/listener_impl.h" +#include "common/network/udp_listener_impl.h" #include "event2/event.h" @@ -119,6 +120,12 @@ DispatcherImpl::createListener(Network::Socket& socket, Network::ListenerCallbac hand_off_restored_destination_connections)}; } +Network::ListenerPtr DispatcherImpl::createUdpListener(Network::Socket& socket, + Network::UdpListenerCallbacks& cb) { + ASSERT(isThreadSafe()); + return Network::ListenerPtr{new Network::UdpListenerImpl(*this, socket, cb)}; +} + TimerPtr DispatcherImpl::createTimer(TimerCb cb) { ASSERT(isThreadSafe()); return scheduler_->createTimer(cb); diff --git a/source/common/event/dispatcher_impl.h b/source/common/event/dispatcher_impl.h index f8edabdc7218..c13108809df8 100644 --- a/source/common/event/dispatcher_impl.h +++ b/source/common/event/dispatcher_impl.h @@ -51,6 +51,8 @@ class DispatcherImpl : Logger::Loggable, public Dispatcher { Network::ListenerPtr createListener(Network::Socket& socket, Network::ListenerCallbacks& cb, bool bind_to_port, bool hand_off_restored_destination_connections) override; + Network::ListenerPtr createUdpListener(Network::Socket& socket, + Network::UdpListenerCallbacks& cb) override; TimerPtr createTimer(TimerCb cb) override; void deferredDelete(DeferredDeletablePtr&& to_delete) override; void exit() override; diff --git a/source/common/network/BUILD b/source/common/network/BUILD index 6d39b662a740..5f356c4f7388 100644 --- a/source/common/network/BUILD +++ b/source/common/network/BUILD @@ -139,9 +139,15 @@ envoy_cc_library( envoy_cc_library( name = "listener_lib", srcs = [ + "base_listener_impl.cc", "listener_impl.cc", + "udp_listener_impl.cc", + ], + hdrs = [ + "base_listener_impl.h", + "listener_impl.h", + "udp_listener_impl.h", ], - hdrs = ["listener_impl.h"], deps = [ ":address_lib", ":io_socket_handle_lib", @@ -151,6 +157,7 @@ envoy_cc_library( "//include/envoy/network:listener_interface", "//include/envoy/stats:stats_interface", "//include/envoy/stats:stats_macros", + "//source/common/buffer:buffer_lib", "//source/common/common:assert_lib", "//source/common/common:empty_string", "//source/common/common:linked_object", diff --git a/source/common/network/base_listener_impl.cc b/source/common/network/base_listener_impl.cc new file mode 100644 index 000000000000..7340dd693bc5 --- /dev/null +++ b/source/common/network/base_listener_impl.cc @@ -0,0 +1,35 @@ +#include "common/network/base_listener_impl.h" + +#include + +#include "envoy/common/exception.h" + +#include "common/common/assert.h" +#include "common/common/empty_string.h" +#include "common/common/fmt.h" +#include "common/event/dispatcher_impl.h" +#include "common/event/file_event_impl.h" +#include "common/network/address_impl.h" + +#include "event2/listener.h" + +namespace Envoy { +namespace Network { + +Address::InstanceConstSharedPtr BaseListenerImpl::getLocalAddress(int fd) { + return Address::addressFromFd(fd); +} + +BaseListenerImpl::BaseListenerImpl(Event::DispatcherImpl& dispatcher, Socket& socket) + : local_address_(nullptr), dispatcher_(dispatcher), socket_(socket) { + const auto ip = socket.localAddress()->ip(); + + // Only use the listen socket's local address for new connections if it is not the all hosts + // address (e.g., 0.0.0.0 for IPv4). + if (!(ip && ip->isAnyAddress())) { + local_address_ = socket.localAddress(); + } +} + +} // namespace Network +} // namespace Envoy diff --git a/source/common/network/base_listener_impl.h b/source/common/network/base_listener_impl.h new file mode 100644 index 000000000000..454e68530041 --- /dev/null +++ b/source/common/network/base_listener_impl.h @@ -0,0 +1,30 @@ +#pragma once + +#include "envoy/network/listener.h" + +#include "common/event/dispatcher_impl.h" +#include "common/event/libevent.h" +#include "common/network/listen_socket_impl.h" + +#include "event2/event.h" + +namespace Envoy { +namespace Network { + +/** + * Base libevent implementation of Network::Listener. + */ +class BaseListenerImpl : public Listener { +public: + BaseListenerImpl(Event::DispatcherImpl& dispatcher, Socket& socket); + +protected: + virtual Address::InstanceConstSharedPtr getLocalAddress(int fd); + + Address::InstanceConstSharedPtr local_address_; + Event::DispatcherImpl& dispatcher_; + Socket& socket_; +}; + +} // namespace Network +} // namespace Envoy diff --git a/source/common/network/listener_impl.cc b/source/common/network/listener_impl.cc index 4493f4f790fc..998b52c25c46 100644 --- a/source/common/network/listener_impl.cc +++ b/source/common/network/listener_impl.cc @@ -17,10 +17,6 @@ namespace Envoy { namespace Network { -Address::InstanceConstSharedPtr ListenerImpl::getLocalAddress(int fd) { - return Address::addressFromFd(fd); -} - void ListenerImpl::listenCallback(evconnlistener*, evutil_socket_t fd, sockaddr* remote_addr, int remote_addr_len, void* arg) { ListenerImpl* listener = static_cast(arg); @@ -51,35 +47,31 @@ void ListenerImpl::listenCallback(evconnlistener*, evutil_socket_t fd, sockaddr* listener->hand_off_restored_destination_connections_); } -ListenerImpl::ListenerImpl(Event::DispatcherImpl& dispatcher, Socket& socket, ListenerCallbacks& cb, - bool bind_to_port, bool hand_off_restored_destination_connections) - : local_address_(nullptr), cb_(cb), - hand_off_restored_destination_connections_(hand_off_restored_destination_connections), - listener_(nullptr) { - const auto ip = socket.localAddress()->ip(); +void ListenerImpl::setupServerSocket(Event::DispatcherImpl& dispatcher, Socket& socket) { + listener_.reset( + evconnlistener_new(&dispatcher.base(), listenCallback, this, 0, -1, socket.ioHandle().fd())); - // Only use the listen socket's local address for new connections if it is not the all hosts - // address (e.g., 0.0.0.0 for IPv4). - if (!(ip && ip->isAnyAddress())) { - local_address_ = socket.localAddress(); + if (!listener_) { + throw CreateListenerException( + fmt::format("cannot listen on socket: {}", socket.localAddress()->asString())); } - if (bind_to_port) { - listener_.reset(evconnlistener_new(&dispatcher.base(), listenCallback, this, 0, -1, - socket.ioHandle().fd())); - - if (!listener_) { - throw CreateListenerException( - fmt::format("cannot listen on socket: {}", socket.localAddress()->asString())); - } + if (!Network::Socket::applyOptions(socket.options(), socket, + envoy::api::v2::core::SocketOption::STATE_LISTENING)) { + throw CreateListenerException(fmt::format("cannot set post-listen socket option on socket: {}", + socket.localAddress()->asString())); + } - if (!Network::Socket::applyOptions(socket.options(), socket, - envoy::api::v2::core::SocketOption::STATE_LISTENING)) { - throw CreateListenerException(fmt::format( - "cannot set post-listen socket option on socket: {}", socket.localAddress()->asString())); - } + evconnlistener_set_error_cb(listener_.get(), errorCallback); +} - evconnlistener_set_error_cb(listener_.get(), errorCallback); +ListenerImpl::ListenerImpl(Event::DispatcherImpl& dispatcher, Socket& socket, ListenerCallbacks& cb, + bool bind_to_port, bool hand_off_restored_destination_connections) + : BaseListenerImpl(dispatcher, socket), cb_(cb), + hand_off_restored_destination_connections_(hand_off_restored_destination_connections), + listener_(nullptr) { + if (bind_to_port) { + setupServerSocket(dispatcher, socket); } } diff --git a/source/common/network/listener_impl.h b/source/common/network/listener_impl.h index 26dff27ce24f..d1df0ad7c162 100644 --- a/source/common/network/listener_impl.h +++ b/source/common/network/listener_impl.h @@ -1,38 +1,32 @@ #pragma once -#include "envoy/network/listener.h" - -#include "common/event/dispatcher_impl.h" -#include "common/event/libevent.h" -#include "common/network/listen_socket_impl.h" - -#include "event2/event.h" +#include "base_listener_impl.h" namespace Envoy { namespace Network { /** - * libevent implementation of Network::Listener. + * libevent implementation of Network::Listener for TCP. + * TODO(conqerAtapple): Consider renaming the class to `TcpListenerImpl`. */ -class ListenerImpl : public Listener { +class ListenerImpl : public BaseListenerImpl { public: ListenerImpl(Event::DispatcherImpl& dispatcher, Socket& socket, ListenerCallbacks& cb, bool bind_to_port, bool hand_off_restored_destination_connections); - void disable(); - void enable(); + void disable() override; + void enable() override; protected: - virtual Address::InstanceConstSharedPtr getLocalAddress(int fd); + void setupServerSocket(Event::DispatcherImpl& dispatcher, Socket& socket); - Address::InstanceConstSharedPtr local_address_; ListenerCallbacks& cb_; const bool hand_off_restored_destination_connections_; private: - static void errorCallback(evconnlistener* listener, void* context); static void listenCallback(evconnlistener*, evutil_socket_t fd, sockaddr* remote_addr, int remote_addr_len, void* arg); + static void errorCallback(evconnlistener* listener, void* context); Event::Libevent::ListenerPtr listener_; }; diff --git a/source/common/network/udp_listener_impl.cc b/source/common/network/udp_listener_impl.cc new file mode 100644 index 000000000000..099889665148 --- /dev/null +++ b/source/common/network/udp_listener_impl.cc @@ -0,0 +1,164 @@ +#include "common/network/udp_listener_impl.h" + +#include + +#include "envoy/buffer/buffer.h" +#include "envoy/common/exception.h" + +#include "common/common/assert.h" +#include "common/common/empty_string.h" +#include "common/common/fmt.h" +#include "common/event/dispatcher_impl.h" +#include "common/network/address_impl.h" + +#include "event2/listener.h" + +namespace Envoy { +namespace Network { + +UdpListenerImpl::UdpListenerImpl(Event::DispatcherImpl& dispatcher, Socket& socket, + UdpListenerCallbacks& cb) + : BaseListenerImpl(dispatcher, socket), cb_(cb) { + file_event_ = dispatcher_.createFileEvent( + socket.ioHandle().fd(), [this](uint32_t events) -> void { onSocketEvent(events); }, + Event::FileTriggerType::Edge, Event::FileReadyType::Read | Event::FileReadyType::Write); + + ASSERT(file_event_); + + if (!Network::Socket::applyOptions(socket.options(), socket, + envoy::api::v2::core::SocketOption::STATE_BOUND)) { + throw CreateListenerException(fmt::format("cannot set post-bound socket option on socket: {}", + socket.localAddress()->asString())); + } +} + +UdpListenerImpl::~UdpListenerImpl() { + disable(); + file_event_.reset(); +} + +void UdpListenerImpl::disable() { file_event_->setEnabled(0); } + +void UdpListenerImpl::enable() { + file_event_->setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write); +} + +UdpListenerImpl::ReceiveResult UdpListenerImpl::doRecvFrom(sockaddr_storage& peer_addr, + socklen_t& addr_len) { + constexpr uint64_t const read_length = 16384; + + Buffer::InstancePtr buffer = std::make_unique(); + + addr_len = sizeof(sockaddr_storage); + memset(&peer_addr, 0, addr_len); + + Buffer::RawSlice slice; + const uint64_t num_slices = buffer->reserve(read_length, &slice, 1); + + ASSERT(num_slices == 1); + // TODO(conqerAtapple): Use os_syscalls + const ssize_t rc = ::recvfrom(socket_.ioHandle().fd(), slice.mem_, read_length, 0, + reinterpret_cast(&peer_addr), &addr_len); + if (rc < 0) { + return ReceiveResult{Api::SysCallIntResult{static_cast(rc), errno}, nullptr}; + } + + slice.len_ = std::min(slice.len_, static_cast(rc)); + buffer->commit(&slice, 1); + + return ReceiveResult{Api::SysCallIntResult{static_cast(rc), 0}, std::move(buffer)}; +} + +void UdpListenerImpl::onSocketEvent(short flags) { + ASSERT((flags & (Event::FileReadyType::Read | Event::FileReadyType::Write))); + + if (flags & Event::FileReadyType::Read) { + handleReadCallback(); + } + + if (flags & Event::FileReadyType::Write) { + handleWriteCallback(); + } +} + +void UdpListenerImpl::handleReadCallback() { + sockaddr_storage addr; + socklen_t addr_len = 0; + + do { + ReceiveResult recv_result = doRecvFrom(addr, addr_len); + if ((recv_result.result_.rc_ < 0)) { + if (recv_result.result_.errno_ != EAGAIN) { + cb_.onError(UdpListenerCallbacks::ErrorCode::SyscallError, recv_result.result_.errno_); + } + return; + } + + if (recv_result.result_.rc_ == 0) { + // TODO(conqerAtapple): Is zero length packet interesting? + return; + } + + Address::InstanceConstSharedPtr local_address = socket_.localAddress(); + + RELEASE_ASSERT( + addr_len > 0, + fmt::format( + "Unable to get remote address for fd: {}, local address: {}. address length is 0 ", + socket_.ioHandle().fd(), local_address->asString())); + + Address::InstanceConstSharedPtr peer_address; + + // TODO(conqerAtApple): Current implementation of Address::addressFromSockAddr + // cannot be used here unfortunately. This should belong in Address namespace. + switch (addr.ss_family) { + case AF_INET: { + const struct sockaddr_in* sin = reinterpret_cast(&addr); + ASSERT(AF_INET == sin->sin_family); + peer_address = std::make_shared(sin); + + break; + } + case AF_INET6: { + const struct sockaddr_in6* sin6 = reinterpret_cast(&addr); + ASSERT(AF_INET6 == sin6->sin6_family); + if (IN6_IS_ADDR_V4MAPPED(&sin6->sin6_addr)) { +#if defined(__APPLE__) + struct sockaddr_in sin = { + {}, AF_INET, sin6->sin6_port, {sin6->sin6_addr.__u6_addr.__u6_addr32[3]}, {}}; +#else + struct sockaddr_in sin = {AF_INET, sin6->sin6_port, {sin6->sin6_addr.s6_addr32[3]}, {}}; +#endif + peer_address = std::make_shared(&sin); + } else { + peer_address = std::make_shared(*sin6, true); + } + + break; + } + + default: + RELEASE_ASSERT(false, + fmt::format("Unsupported address family: {}, local address: {}, receive size: " + "{}, address length: {}", + addr.ss_family, local_address->asString(), recv_result.result_.rc_, + addr_len)); + break; + } + + RELEASE_ASSERT((peer_address != nullptr), + fmt::format("Unable to get remote address for fd: {}, local address: {} ", + socket_.ioHandle().fd(), local_address->asString())); + + RELEASE_ASSERT((local_address != nullptr), + fmt::format("Unable to get local address for fd: {}", socket_.ioHandle().fd())); + + cb_.onData(UdpData{local_address, peer_address, std::move(recv_result.buffer_)}); + + } while (true); +} + +void UdpListenerImpl::handleWriteCallback() { cb_.onWriteReady(socket_); } + +} // namespace Network +} // namespace Envoy diff --git a/source/common/network/udp_listener_impl.h b/source/common/network/udp_listener_impl.h new file mode 100644 index 000000000000..2596a686b07e --- /dev/null +++ b/source/common/network/udp_listener_impl.h @@ -0,0 +1,46 @@ +#pragma once + +#include + +#include "common/buffer/buffer_impl.h" +#include "common/event/event_impl_base.h" +#include "common/event/file_event_impl.h" + +#include "base_listener_impl.h" + +namespace Envoy { +namespace Network { + +/** + * libevent implementation of Network::Listener for UDP. + */ +class UdpListenerImpl : public BaseListenerImpl { +public: + UdpListenerImpl(Event::DispatcherImpl& dispatcher, Socket& socket, UdpListenerCallbacks& cb); + + ~UdpListenerImpl(); + + virtual void disable() override; + virtual void enable() override; + + struct ReceiveResult { + Api::SysCallIntResult result_; + Buffer::InstancePtr buffer_; + }; + + // Useful for testing/mocking. + virtual ReceiveResult doRecvFrom(sockaddr_storage& peer_addr, socklen_t& addr_len); + +protected: + void handleWriteCallback(); + void handleReadCallback(); + + UdpListenerCallbacks& cb_; + +private: + void onSocketEvent(short flags); + Event::FileEventPtr file_event_; +}; + +} // namespace Network +} // namespace Envoy diff --git a/test/common/network/BUILD b/test/common/network/BUILD index 61637c16b76d..0dc910b23e19 100644 --- a/test/common/network/BUILD +++ b/test/common/network/BUILD @@ -160,6 +160,24 @@ envoy_cc_test( ], ) +envoy_cc_test( + name = "udp_listener_impl_test", + srcs = ["udp_listener_impl_test.cc"], + deps = [ + "//source/common/event:dispatcher_lib", + "//source/common/network:address_lib", + "//source/common/network:listener_lib", + "//source/common/network:utility_lib", + "//source/common/stats:stats_lib", + "//test/mocks/network:network_mocks", + "//test/mocks/server:server_mocks", + "//test/test_common:environment_lib", + "//test/test_common:network_utility_lib", + "//test/test_common:test_time_lib", + "//test/test_common:utility_lib", + ], +) + envoy_cc_test( name = "resolver_test", srcs = ["resolver_impl_test.cc"], diff --git a/test/common/network/listener_impl_test.cc b/test/common/network/listener_impl_test.cc index 8c1933876e54..07c88b8b33e1 100644 --- a/test/common/network/listener_impl_test.cc +++ b/test/common/network/listener_impl_test.cc @@ -106,17 +106,6 @@ TEST_P(ListenerImplTest, SetListeningSocketOptionsSuccess) { TestListenerImpl listener(dispatcher_, socket, listener_callbacks, true, false); } -// Test that socket options are set after the listener is setup. -TEST_P(ListenerImplTest, UdpSetListeningSocketOptionsSuccess) { - Network::MockListenerCallbacks listener_callbacks; - Network::MockConnectionHandler connection_handler; - - Network::UdpListenSocket socket(Network::Test::getCanonicalLoopbackAddress(version_), nullptr, - true); - std::shared_ptr option = std::make_shared(); - socket.addOption(option); -} - // Test that an exception is thrown if there is an error setting socket options. TEST_P(ListenerImplTest, SetListeningSocketOptionsError) { Network::MockListenerCallbacks listener_callbacks; diff --git a/test/common/network/udp_listener_impl_test.cc b/test/common/network/udp_listener_impl_test.cc new file mode 100644 index 000000000000..3251d1b9775e --- /dev/null +++ b/test/common/network/udp_listener_impl_test.cc @@ -0,0 +1,521 @@ +#include +#include +#include + +#include "common/network/address_impl.h" +#include "common/network/udp_listener_impl.h" +#include "common/network/utility.h" + +#include "test/mocks/network/mocks.h" +#include "test/mocks/server/mocks.h" +#include "test/test_common/environment.h" +#include "test/test_common/network_utility.h" +#include "test/test_common/test_time.h" +#include "test/test_common/utility.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +using testing::_; +using testing::Invoke; +using testing::Return; + +namespace Envoy { +namespace Network { + +class TestUdpListenerImpl : public UdpListenerImpl { +public: + TestUdpListenerImpl(Event::DispatcherImpl& dispatcher, Socket& socket, UdpListenerCallbacks& cb) + : UdpListenerImpl(dispatcher, socket, cb) {} + + MOCK_METHOD2(doRecvFrom, + UdpListenerImpl::ReceiveResult(sockaddr_storage& peer_addr, socklen_t& addr_len)); + + UdpListenerImpl::ReceiveResult doRecvFrom_(sockaddr_storage& peer_addr, socklen_t& addr_len) { + return UdpListenerImpl::doRecvFrom(peer_addr, addr_len); + } +}; + +class ListenerImplTest : public testing::TestWithParam { +protected: + ListenerImplTest() + : version_(GetParam()), + alt_address_(Network::Test::findOrCheckFreePort( + Network::Test::getCanonicalLoopbackAddress(version_), Address::SocketType::Stream)), + api_(Api::createApiForTest(stats_store_)), dispatcher_(test_time_.timeSystem(), *api_) {} + + SocketPtr getSocket(Address::SocketType type, const Address::InstanceConstSharedPtr& address, + const Network::Socket::OptionsSharedPtr& options, bool bind) { + if (type == Address::SocketType::Stream) { + using NetworkSocketTraitType = NetworkSocketTrait; + return std::make_unique>(address, options, bind); + } else if (type == Address::SocketType::Datagram) { + using NetworkSocketTraitType = NetworkSocketTrait; + return std::make_unique>(address, options, bind); + } + + return nullptr; + } + + // TODO(conqerAtapple): Move this to a common place(address.h?) + void getSocketAddressInfo(const Address::Ip* ip, uint32_t port, sockaddr_storage& addr, + socklen_t& sz) { + if (!ip) { + sz = 0; + return; + } + + memset(&addr, 0, sizeof(addr)); + + if (version_ == Address::IpVersion::v4) { + addr.ss_family = AF_INET; + auto const* ipv4 = ip->ipv4(); + if (!ipv4) { + sz = 0; + return; + } + + sockaddr_in* addrv4 = reinterpret_cast(&addr); + addrv4->sin_port = htons(port); + addrv4->sin_addr.s_addr = ipv4->address(); + + sz = sizeof(sockaddr_in); + } else if (version_ == Address::IpVersion::v6) { + addr.ss_family = AF_INET6; + auto const* ipv6 = ip->ipv6(); + if (!ipv6) { + sz = 0; + return; + } + + sockaddr_in6* addrv6 = reinterpret_cast(&addr); + addrv6->sin6_port = htons(port); + + const auto address = ipv6->address(); + memcpy(static_cast(&addrv6->sin6_addr.s6_addr), static_cast(&address), + sizeof(absl::uint128)); + + sz = sizeof(sockaddr_in6); + } else { + sz = 0; + } + } + + void getSocketAddressInfo(const Socket& socket, uint32_t port, sockaddr_storage& addr, + socklen_t& sz) { + getSocketAddressInfo(socket.localAddress()->ip(), port, addr, sz); + } + + void getSocketAddressInfo(Address::InstanceConstSharedPtr address, uint32_t port, + sockaddr_storage& addr, socklen_t& sz) { + ASSERT(address); + + getSocketAddressInfo(address->ip(), port, addr, sz); + } + + const Address::IpVersion version_; + const Address::InstanceConstSharedPtr alt_address_; + Stats::IsolatedStoreImpl stats_store_; + Api::ApiPtr api_; + DangerousDeprecatedTestTime test_time_; + Event::DispatcherImpl dispatcher_; +}; +INSTANTIATE_TEST_CASE_P(IpVersions, ListenerImplTest, + testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), + TestUtility::ipTestParamsToString); + +// Test that socket options are set after the listener is setup. +TEST_P(ListenerImplTest, UdpSetListeningSocketOptionsSuccess) { + Network::MockListenerCallbacks listener_callbacks; + Network::MockConnectionHandler connection_handler; + + Network::UdpListenSocket socket(Network::Test::getCanonicalLoopbackAddress(version_), nullptr, + true); + std::shared_ptr option = std::make_shared(); + socket.addOption(option); +} + +/** + * Tests UDP listener for actual destination and data. + */ +TEST_P(ListenerImplTest, UseActualDstUdp) { + // Setup server socket + SocketPtr server_socket = + getSocket(Address::SocketType::Datagram, Network::Test::getCanonicalLoopbackAddress(version_), + nullptr, true); + + ASSERT_NE(server_socket, nullptr); + + auto const* server_ip = server_socket->localAddress()->ip(); + ASSERT_NE(server_ip, nullptr); + + // Setup callback handler and listener. + Network::MockUdpListenerCallbacks listener_callbacks; + Network::TestUdpListenerImpl listener(dispatcher_, *server_socket.get(), listener_callbacks); + + EXPECT_CALL(listener, doRecvFrom(_, _)) + .WillRepeatedly(Invoke([&](sockaddr_storage& peer_addr, socklen_t& addr_len) { + return listener.doRecvFrom_(peer_addr, addr_len); + })); + + // Setup client socket. + SocketPtr client_socket = + getSocket(Address::SocketType::Datagram, Network::Test::getCanonicalLoopbackAddress(version_), + nullptr, false); + + const int client_sockfd = client_socket->ioHandle().fd(); + sockaddr_storage server_addr; + socklen_t addr_len; + + getSocketAddressInfo(*client_socket.get(), server_ip->port(), server_addr, addr_len); + ASSERT_GT(addr_len, 0); + + // We send 2 packets + const std::string first("first"); + const std::string second("second"); + + auto send_rc = ::sendto(client_sockfd, first.c_str(), first.length(), 0, + reinterpret_cast(&server_addr), addr_len); + + ASSERT_EQ(send_rc, first.length()); + + send_rc = ::sendto(client_sockfd, second.c_str(), second.length(), 0, + reinterpret_cast(&server_addr), addr_len); + + ASSERT_EQ(send_rc, second.length()); + + auto validateCallParams = [&](Address::InstanceConstSharedPtr local_address, + Address::InstanceConstSharedPtr peer_address) { + ASSERT_NE(local_address, nullptr); + + ASSERT_NE(peer_address, nullptr); + ASSERT_NE(peer_address->ip(), nullptr); + + EXPECT_EQ(local_address->asString(), server_socket->localAddress()->asString()); + + EXPECT_EQ(peer_address->ip()->addressAsString(), + client_socket->localAddress()->ip()->addressAsString()); + + EXPECT_EQ(*local_address, *server_socket->localAddress()); + }; + + EXPECT_CALL(listener_callbacks, onData_(_)) + .WillOnce(Invoke([&](const UdpData& data) -> void { + validateCallParams(data.local_address_, data.peer_address_); + + EXPECT_EQ(data.buffer_->toString(), first); + })) + .WillOnce(Invoke([&](const UdpData& data) -> void { + validateCallParams(data.local_address_, data.peer_address_); + + EXPECT_EQ(data.buffer_->toString(), second); + + dispatcher_.exit(); + })); + + EXPECT_CALL(listener_callbacks, onWriteReady_(_)) + .WillRepeatedly(Invoke([&](const Socket& socket) { + EXPECT_EQ(socket.ioHandle().fd(), server_socket->ioHandle().fd()); + })); + + dispatcher_.run(Event::Dispatcher::RunType::Block); +} + +/** + * Tests UDP listener for read and write callbacks with actual data. + */ +TEST_P(ListenerImplTest, UdpEcho) { + // Setup server socket + SocketPtr server_socket = + getSocket(Address::SocketType::Datagram, Network::Test::getCanonicalLoopbackAddress(version_), + nullptr, true); + + ASSERT_NE(server_socket, nullptr); + + auto const* server_ip = server_socket->localAddress()->ip(); + ASSERT_NE(server_ip, nullptr); + + // Setup callback handler and listener. + Network::MockUdpListenerCallbacks listener_callbacks; + Network::TestUdpListenerImpl listener(dispatcher_, *server_socket.get(), listener_callbacks); + + EXPECT_CALL(listener, doRecvFrom(_, _)) + .WillRepeatedly(Invoke([&](sockaddr_storage& peer_addr, socklen_t& addr_len) { + return listener.doRecvFrom_(peer_addr, addr_len); + })); + + // Setup client socket. + SocketPtr client_socket = + getSocket(Address::SocketType::Datagram, Network::Test::getCanonicalLoopbackAddress(version_), + nullptr, false); + + const int client_sockfd = client_socket->ioHandle().fd(); + sockaddr_storage server_addr; + socklen_t addr_len; + + getSocketAddressInfo(*client_socket.get(), server_ip->port(), server_addr, addr_len); + ASSERT_GT(addr_len, 0); + + // We send 2 packets and exptect it to echo. + const std::string first("first"); + const std::string second("second"); + + auto send_rc = ::sendto(client_sockfd, first.c_str(), first.length(), 0, + reinterpret_cast(&server_addr), addr_len); + + ASSERT_EQ(send_rc, first.length()); + + send_rc = ::sendto(client_sockfd, second.c_str(), second.length(), 0, + reinterpret_cast(&server_addr), addr_len); + + ASSERT_EQ(send_rc, second.length()); + + auto validateCallParams = [&](Address::InstanceConstSharedPtr local_address, + Address::InstanceConstSharedPtr peer_address) { + ASSERT_NE(local_address, nullptr); + + ASSERT_NE(peer_address, nullptr); + ASSERT_NE(peer_address->ip(), nullptr); + + EXPECT_EQ(local_address->asString(), server_socket->localAddress()->asString()); + + EXPECT_EQ(peer_address->ip()->addressAsString(), + client_socket->localAddress()->ip()->addressAsString()); + + EXPECT_EQ(*local_address, *server_socket->localAddress()); + }; + + Event::TimerPtr timer = dispatcher_.createTimer([&] { dispatcher_.exit(); }); + + timer->enableTimer(std::chrono::milliseconds(2000)); + + // For unit test purposes, we assume that the data was received in order. + Address::InstanceConstSharedPtr test_peer_address; + + std::vector server_received_data; + + EXPECT_CALL(listener_callbacks, onData_(_)) + .WillOnce(Invoke([&](const UdpData& data) -> void { + validateCallParams(data.local_address_, data.peer_address_); + + test_peer_address = data.peer_address_; + + const std::string data_str = data.buffer_->toString(); + EXPECT_EQ(data_str, first); + + server_received_data.push_back(data_str); + })) + .WillOnce(Invoke([&](const UdpData& data) -> void { + validateCallParams(data.local_address_, data.peer_address_); + + const std::string data_str = data.buffer_->toString(); + EXPECT_EQ(data_str, second); + + server_received_data.push_back(data_str); + })); + + EXPECT_CALL(listener_callbacks, onWriteReady_(_)) + .WillRepeatedly(Invoke([&](const Socket& socket) { + EXPECT_EQ(socket.ioHandle().fd(), server_socket->ioHandle().fd()); + + sockaddr_storage client_addr; + socklen_t client_addr_len; + + ASSERT_NE(test_peer_address, nullptr); + const uint32_t peer_port = test_peer_address->ip()->port(); + + getSocketAddressInfo(test_peer_address, peer_port, client_addr, client_addr_len); + ASSERT_GT(client_addr_len, 0); + + for (const auto& data : server_received_data) { + const std::string::size_type data_size = data.length() + 1; + uint64_t total_sent = 0; + + do { + auto send_rc = ::sendto( + socket.ioHandle().fd(), data.c_str() + total_sent, data_size - total_sent, 0, + reinterpret_cast(&client_addr), client_addr_len); + + if (send_rc > 0) { + total_sent += send_rc; + if (total_sent >= data_size) { + break; + } + } else if (errno != EAGAIN) { + break; + } + } while (((send_rc < 0) && (errno == EAGAIN)) || (total_sent < data_size)); + + EXPECT_EQ(total_sent, data_size); + } + + server_received_data.clear(); + })); + + dispatcher_.run(Event::Dispatcher::RunType::Block); +} + +/** + * Tests UDP listener's `enable` and `disable` APIs. + */ +TEST_P(ListenerImplTest, UdpListenerEnableDisable) { + // Setup server socket + SocketPtr server_socket = + getSocket(Address::SocketType::Datagram, Network::Test::getCanonicalLoopbackAddress(version_), + nullptr, true); + + ASSERT_NE(server_socket, nullptr); + + auto const* server_ip = server_socket->localAddress()->ip(); + ASSERT_NE(server_ip, nullptr); + + // Setup callback handler and listener. + Network::MockUdpListenerCallbacks listener_callbacks; + Network::TestUdpListenerImpl listener(dispatcher_, *server_socket.get(), listener_callbacks); + + EXPECT_CALL(listener, doRecvFrom(_, _)) + .WillRepeatedly(Invoke([&](sockaddr_storage& peer_addr, socklen_t& addr_len) { + return listener.doRecvFrom_(peer_addr, addr_len); + })); + + // Setup client socket. + SocketPtr client_socket = + getSocket(Address::SocketType::Datagram, Network::Test::getCanonicalLoopbackAddress(version_), + nullptr, false); + + const int client_sockfd = client_socket->ioHandle().fd(); + sockaddr_storage server_addr; + socklen_t addr_len; + + getSocketAddressInfo(*client_socket.get(), server_ip->port(), server_addr, addr_len); + ASSERT_GT(addr_len, 0); + + // We first disable the listener and then send two packets. + // - With the listener disabled, we expect that none of the callbacks will be + // called. + // - When the listener is enabled back, we expect the callbacks to be called + const std::string first("first"); + const std::string second("second"); + + listener.disable(); + + auto send_rc = ::sendto(client_sockfd, first.c_str(), first.length(), 0, + reinterpret_cast(&server_addr), addr_len); + + ASSERT_EQ(send_rc, first.length()); + + send_rc = ::sendto(client_sockfd, second.c_str(), second.length(), 0, + reinterpret_cast(&server_addr), addr_len); + + ASSERT_EQ(send_rc, second.length()); + + auto validateCallParams = [&](Address::InstanceConstSharedPtr local_address, + Address::InstanceConstSharedPtr peer_address) { + ASSERT_NE(local_address, nullptr); + + ASSERT_NE(peer_address, nullptr); + ASSERT_NE(peer_address->ip(), nullptr); + + ASSERT_EQ(local_address->asString(), server_socket->localAddress()->asString()); + + ASSERT_EQ(peer_address->ip()->addressAsString(), + client_socket->localAddress()->ip()->addressAsString()); + + EXPECT_EQ(*local_address, *server_socket->localAddress()); + }; + + Event::TimerPtr timer = dispatcher_.createTimer([&] { dispatcher_.exit(); }); + + timer->enableTimer(std::chrono::milliseconds(2000)); + + EXPECT_CALL(listener_callbacks, onData_(_)).Times(0); + + EXPECT_CALL(listener_callbacks, onWriteReady_(_)).Times(0); + + dispatcher_.run(Event::Dispatcher::RunType::Block); + + listener.enable(); + + EXPECT_CALL(listener_callbacks, onData_(_)) + .Times(2) + .WillOnce(Return()) + .WillOnce(Invoke([&](const UdpData& data) -> void { + validateCallParams(data.local_address_, data.peer_address_); + + EXPECT_EQ(data.buffer_->toString(), second); + + dispatcher_.exit(); + })); + + EXPECT_CALL(listener_callbacks, onWriteReady_(_)) + .WillRepeatedly(Invoke([&](const Socket& socket) { + EXPECT_EQ(socket.ioHandle().fd(), server_socket->ioHandle().fd()); + })); + + dispatcher_.run(Event::Dispatcher::RunType::Block); +} + +/** + * Tests UDP listebe's error callback. + */ +TEST_P(ListenerImplTest, UdpListenerRecvFromError) { + // Setup server socket + SocketPtr server_socket = + getSocket(Address::SocketType::Datagram, Network::Test::getCanonicalLoopbackAddress(version_), + nullptr, true); + + ASSERT_NE(server_socket, nullptr); + + auto const* server_ip = server_socket->localAddress()->ip(); + ASSERT_NE(server_ip, nullptr); + + // Setup callback handler and listener. + Network::MockUdpListenerCallbacks listener_callbacks; + Network::TestUdpListenerImpl listener(dispatcher_, *server_socket.get(), listener_callbacks); + + EXPECT_CALL(listener, doRecvFrom(_, _)).WillRepeatedly(Invoke([&](sockaddr_storage&, socklen_t&) { + return UdpListenerImpl::ReceiveResult{{-1, -1}, nullptr}; + })); + + SocketPtr client_socket = + getSocket(Address::SocketType::Datagram, Network::Test::getCanonicalLoopbackAddress(version_), + nullptr, false); + + const int client_sockfd = client_socket->ioHandle().fd(); + sockaddr_storage server_addr; + socklen_t addr_len; + + getSocketAddressInfo(*client_socket.get(), server_ip->port(), server_addr, addr_len); + ASSERT_GT(addr_len, 0); + + // When the `receive` system call returns an error, we expect the `onError` + // callback callwed with `SyscallError` parameter. + const std::string first("first"); + + auto send_rc = ::sendto(client_sockfd, first.c_str(), first.length(), 0, + reinterpret_cast(&server_addr), addr_len); + + ASSERT_EQ(send_rc, first.length()); + + EXPECT_CALL(listener_callbacks, onData_(_)).Times(0); + + EXPECT_CALL(listener_callbacks, onWriteReady_(_)) + .Times(1) + .WillRepeatedly(Invoke([&](const Socket& socket) { + EXPECT_EQ(socket.ioHandle().fd(), server_socket->ioHandle().fd()); + })); + + EXPECT_CALL(listener_callbacks, onError_(_, _)) + .Times(1) + .WillOnce(Invoke([&](const UdpListenerCallbacks::ErrorCode& err_code, int err) -> void { + ASSERT_EQ(err_code, UdpListenerCallbacks::ErrorCode::SyscallError); + ASSERT_EQ(err, -1); + + dispatcher_.exit(); + })); + + dispatcher_.run(Event::Dispatcher::RunType::Block); +} + +} // namespace Network +} // namespace Envoy diff --git a/test/mocks/event/mocks.h b/test/mocks/event/mocks.h index 6f11b90b1cc5..55ef6403498a 100644 --- a/test/mocks/event/mocks.h +++ b/test/mocks/event/mocks.h @@ -65,6 +65,11 @@ class MockDispatcher : public Dispatcher { createListener_(socket, cb, bind_to_port, hand_off_restored_destination_connections)}; } + Network::ListenerPtr createUdpListener(Network::Socket& socket, + Network::UdpListenerCallbacks& cb) override { + return Network::ListenerPtr{createUdpListener_(socket, cb)}; + } + Event::TimerPtr createTimer(Event::TimerCb cb) override { return Event::TimerPtr{createTimer_(cb)}; } @@ -101,6 +106,8 @@ class MockDispatcher : public Dispatcher { Network::Listener*(Network::Socket& socket, Network::ListenerCallbacks& cb, bool bind_to_port, bool hand_off_restored_destination_connections)); + MOCK_METHOD2(createUdpListener_, + Network::Listener*(Network::Socket& socket, Network::UdpListenerCallbacks& cb)); MOCK_METHOD1(createTimer_, Timer*(Event::TimerCb cb)); MOCK_METHOD1(deferredDelete_, void(DeferredDeletable* to_delete)); MOCK_METHOD0(exit, void()); diff --git a/test/mocks/network/mocks.cc b/test/mocks/network/mocks.cc index c91d083ef175..538c5c36cd31 100644 --- a/test/mocks/network/mocks.cc +++ b/test/mocks/network/mocks.cc @@ -78,6 +78,9 @@ MockFilter::~MockFilter() {} MockListenerCallbacks::MockListenerCallbacks() {} MockListenerCallbacks::~MockListenerCallbacks() {} +MockUdpListenerCallbacks::MockUdpListenerCallbacks() {} +MockUdpListenerCallbacks::~MockUdpListenerCallbacks() {} + MockDrainDecision::MockDrainDecision() {} MockDrainDecision::~MockDrainDecision() {} diff --git a/test/mocks/network/mocks.h b/test/mocks/network/mocks.h index d0dc937cea47..5ec2f8183e56 100644 --- a/test/mocks/network/mocks.h +++ b/test/mocks/network/mocks.h @@ -117,6 +117,24 @@ class MockListenerCallbacks : public ListenerCallbacks { MOCK_METHOD1(onNewConnection_, void(ConnectionPtr& conn)); }; +class MockUdpListenerCallbacks : public UdpListenerCallbacks { +public: + MockUdpListenerCallbacks(); + ~MockUdpListenerCallbacks(); + + void onData(const UdpData& data) override { onData_(data); } + + void onWriteReady(const Socket& socket) override { onWriteReady_(socket); } + + void onError(const ErrorCode& err_code, int err) override { onError_(err_code, err); } + + MOCK_METHOD1(onData_, void(const UdpData& data)); + + MOCK_METHOD1(onWriteReady_, void(const Socket& socket)); + + MOCK_METHOD2(onError_, void(const ErrorCode& err_code, int err)); +}; + class MockDrainDecision : public DrainDecision { public: MockDrainDecision(); @@ -295,6 +313,7 @@ class MockConnectionHandler : public ConnectionHandler { MOCK_METHOD0(numConnections, uint64_t()); MOCK_METHOD1(addListener, void(ListenerConfig& config)); + MOCK_METHOD1(addUdpListener, void(ListenerConfig& config)); MOCK_METHOD1(findListenerByAddress, Network::Listener*(const Network::Address::Instance& address)); MOCK_METHOD1(removeListeners, void(uint64_t listener_tag));