Skip to content

Commit

Permalink
Introducing UDP listener. (envoyproxy#5473)
Browse files Browse the repository at this point in the history
Signed-off-by: Jojy G Varghese <jojy_varghese@apple.com>
Signed-off-by: Fred Douglas <fredlas@google.com>
  • Loading branch information
conqerAtapple authored and fredlas committed Mar 5, 2019
1 parent b22ef87 commit 0ce6578
Show file tree
Hide file tree
Showing 17 changed files with 949 additions and 54 deletions.
8 changes: 8 additions & 0 deletions include/envoy/event/dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,14 @@ class Dispatcher {
Network::ListenerCallbacks& cb, bool bind_to_port,
bool hand_off_restored_destination_connections) PURE;

/**
* Create a logical udp listener on a specific port.
* @param socket supplies the socket to listen on.
* @param cb supplies the udp listener callbacks to invoke for listener events.
* @return Network::ListenerPtr a new listener that is owned by the caller.
*/
virtual Network::ListenerPtr createUdpListener(Network::Socket& socket,
Network::UdpListenerCallbacks& cb) PURE;
/**
* Allocate a timer. @see Timer for docs on how to use the timer.
* @param cb supplies the callback to invoke when the timer fires.
Expand Down
53 changes: 53 additions & 0 deletions include/envoy/network/listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,59 @@ class ListenerCallbacks {
virtual void onNewConnection(ConnectionPtr&& new_connection) PURE;
};

/**
* Utility struct that encapsulates the information from a udp socket's
* recvfrom/recvmmsg call.
*
* TODO(conqerAtapple): Maybe this belongs inside the UdpListenerCallbacks
* class.
*/
struct UdpData {
Address::InstanceConstSharedPtr local_address_;
Address::InstanceConstSharedPtr peer_address_; // TODO(conquerAtapple): Fix ownership semantics.
Buffer::InstancePtr buffer_;
// TODO(conquerAtapple):
// Add UdpReader here so that the callback handler can
// then use the reader to do multiple reads(recvmmsg) once the OS notifies it
// has data. We could also just return a `ReaderFactory` that returns either a
// `recvfrom` reader (with peer information) or a `read/recvmmsg` reader. This
// is still being flushed out (Jan, 2019).
};

/**
* Udp listener callbacks.
*/
class UdpListenerCallbacks {
public:
enum class ErrorCode { SyscallError, UnknownError };

virtual ~UdpListenerCallbacks() = default;

/**
* Called whenever data is received by the underlying udp socket.
*
* @param data UdpData from the underlying socket.
*/
virtual void onData(const UdpData& data) PURE;

/**
* Called when the underlying socket is ready for write.
*
* @param socket Underlying server socket for the listener.
*
* TODO(conqerAtapple): Maybe we need a UdpWriter here instead of Socket.
*/
virtual void onWriteReady(const Socket& socket) PURE;

/**
* Called when there is an error event.
*
* @param error_code ErrorCode for the error event.
* @param error_number System error number.
*/
virtual void onError(const ErrorCode& error_code, int error_number) PURE;
};

/**
* An abstract socket listener. Free the listener to stop listening on the socket.
*/
Expand Down
7 changes: 7 additions & 0 deletions source/common/event/dispatcher_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "common/network/connection_impl.h"
#include "common/network/dns_impl.h"
#include "common/network/listener_impl.h"
#include "common/network/udp_listener_impl.h"

#include "event2/event.h"

Expand Down Expand Up @@ -119,6 +120,12 @@ DispatcherImpl::createListener(Network::Socket& socket, Network::ListenerCallbac
hand_off_restored_destination_connections)};
}

Network::ListenerPtr DispatcherImpl::createUdpListener(Network::Socket& socket,
Network::UdpListenerCallbacks& cb) {
ASSERT(isThreadSafe());
return Network::ListenerPtr{new Network::UdpListenerImpl(*this, socket, cb)};
}

TimerPtr DispatcherImpl::createTimer(TimerCb cb) {
ASSERT(isThreadSafe());
return scheduler_->createTimer(cb);
Expand Down
2 changes: 2 additions & 0 deletions source/common/event/dispatcher_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ class DispatcherImpl : Logger::Loggable<Logger::Id::main>, public Dispatcher {
Network::ListenerPtr createListener(Network::Socket& socket, Network::ListenerCallbacks& cb,
bool bind_to_port,
bool hand_off_restored_destination_connections) override;
Network::ListenerPtr createUdpListener(Network::Socket& socket,
Network::UdpListenerCallbacks& cb) override;
TimerPtr createTimer(TimerCb cb) override;
void deferredDelete(DeferredDeletablePtr&& to_delete) override;
void exit() override;
Expand Down
9 changes: 8 additions & 1 deletion source/common/network/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,15 @@ envoy_cc_library(
envoy_cc_library(
name = "listener_lib",
srcs = [
"base_listener_impl.cc",
"listener_impl.cc",
"udp_listener_impl.cc",
],
hdrs = [
"base_listener_impl.h",
"listener_impl.h",
"udp_listener_impl.h",
],
hdrs = ["listener_impl.h"],
deps = [
":address_lib",
":io_socket_handle_lib",
Expand All @@ -151,6 +157,7 @@ envoy_cc_library(
"//include/envoy/network:listener_interface",
"//include/envoy/stats:stats_interface",
"//include/envoy/stats:stats_macros",
"//source/common/buffer:buffer_lib",
"//source/common/common:assert_lib",
"//source/common/common:empty_string",
"//source/common/common:linked_object",
Expand Down
35 changes: 35 additions & 0 deletions source/common/network/base_listener_impl.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#include "common/network/base_listener_impl.h"

#include <sys/un.h>

#include "envoy/common/exception.h"

#include "common/common/assert.h"
#include "common/common/empty_string.h"
#include "common/common/fmt.h"
#include "common/event/dispatcher_impl.h"
#include "common/event/file_event_impl.h"
#include "common/network/address_impl.h"

#include "event2/listener.h"

namespace Envoy {
namespace Network {

Address::InstanceConstSharedPtr BaseListenerImpl::getLocalAddress(int fd) {
return Address::addressFromFd(fd);
}

BaseListenerImpl::BaseListenerImpl(Event::DispatcherImpl& dispatcher, Socket& socket)
: local_address_(nullptr), dispatcher_(dispatcher), socket_(socket) {
const auto ip = socket.localAddress()->ip();

// Only use the listen socket's local address for new connections if it is not the all hosts
// address (e.g., 0.0.0.0 for IPv4).
if (!(ip && ip->isAnyAddress())) {
local_address_ = socket.localAddress();
}
}

} // namespace Network
} // namespace Envoy
30 changes: 30 additions & 0 deletions source/common/network/base_listener_impl.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#pragma once

#include "envoy/network/listener.h"

#include "common/event/dispatcher_impl.h"
#include "common/event/libevent.h"
#include "common/network/listen_socket_impl.h"

#include "event2/event.h"

namespace Envoy {
namespace Network {

/**
* Base libevent implementation of Network::Listener.
*/
class BaseListenerImpl : public Listener {
public:
BaseListenerImpl(Event::DispatcherImpl& dispatcher, Socket& socket);

protected:
virtual Address::InstanceConstSharedPtr getLocalAddress(int fd);

Address::InstanceConstSharedPtr local_address_;
Event::DispatcherImpl& dispatcher_;
Socket& socket_;
};

} // namespace Network
} // namespace Envoy
48 changes: 20 additions & 28 deletions source/common/network/listener_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@
namespace Envoy {
namespace Network {

Address::InstanceConstSharedPtr ListenerImpl::getLocalAddress(int fd) {
return Address::addressFromFd(fd);
}

void ListenerImpl::listenCallback(evconnlistener*, evutil_socket_t fd, sockaddr* remote_addr,
int remote_addr_len, void* arg) {
ListenerImpl* listener = static_cast<ListenerImpl*>(arg);
Expand Down Expand Up @@ -51,35 +47,31 @@ void ListenerImpl::listenCallback(evconnlistener*, evutil_socket_t fd, sockaddr*
listener->hand_off_restored_destination_connections_);
}

ListenerImpl::ListenerImpl(Event::DispatcherImpl& dispatcher, Socket& socket, ListenerCallbacks& cb,
bool bind_to_port, bool hand_off_restored_destination_connections)
: local_address_(nullptr), cb_(cb),
hand_off_restored_destination_connections_(hand_off_restored_destination_connections),
listener_(nullptr) {
const auto ip = socket.localAddress()->ip();
void ListenerImpl::setupServerSocket(Event::DispatcherImpl& dispatcher, Socket& socket) {
listener_.reset(
evconnlistener_new(&dispatcher.base(), listenCallback, this, 0, -1, socket.ioHandle().fd()));

// Only use the listen socket's local address for new connections if it is not the all hosts
// address (e.g., 0.0.0.0 for IPv4).
if (!(ip && ip->isAnyAddress())) {
local_address_ = socket.localAddress();
if (!listener_) {
throw CreateListenerException(
fmt::format("cannot listen on socket: {}", socket.localAddress()->asString()));
}

if (bind_to_port) {
listener_.reset(evconnlistener_new(&dispatcher.base(), listenCallback, this, 0, -1,
socket.ioHandle().fd()));

if (!listener_) {
throw CreateListenerException(
fmt::format("cannot listen on socket: {}", socket.localAddress()->asString()));
}
if (!Network::Socket::applyOptions(socket.options(), socket,
envoy::api::v2::core::SocketOption::STATE_LISTENING)) {
throw CreateListenerException(fmt::format("cannot set post-listen socket option on socket: {}",
socket.localAddress()->asString()));
}

if (!Network::Socket::applyOptions(socket.options(), socket,
envoy::api::v2::core::SocketOption::STATE_LISTENING)) {
throw CreateListenerException(fmt::format(
"cannot set post-listen socket option on socket: {}", socket.localAddress()->asString()));
}
evconnlistener_set_error_cb(listener_.get(), errorCallback);
}

evconnlistener_set_error_cb(listener_.get(), errorCallback);
ListenerImpl::ListenerImpl(Event::DispatcherImpl& dispatcher, Socket& socket, ListenerCallbacks& cb,
bool bind_to_port, bool hand_off_restored_destination_connections)
: BaseListenerImpl(dispatcher, socket), cb_(cb),
hand_off_restored_destination_connections_(hand_off_restored_destination_connections),
listener_(nullptr) {
if (bind_to_port) {
setupServerSocket(dispatcher, socket);
}
}

Expand Down
22 changes: 8 additions & 14 deletions source/common/network/listener_impl.h
Original file line number Diff line number Diff line change
@@ -1,38 +1,32 @@
#pragma once

#include "envoy/network/listener.h"

#include "common/event/dispatcher_impl.h"
#include "common/event/libevent.h"
#include "common/network/listen_socket_impl.h"

#include "event2/event.h"
#include "base_listener_impl.h"

namespace Envoy {
namespace Network {

/**
* libevent implementation of Network::Listener.
* libevent implementation of Network::Listener for TCP.
* TODO(conqerAtapple): Consider renaming the class to `TcpListenerImpl`.
*/
class ListenerImpl : public Listener {
class ListenerImpl : public BaseListenerImpl {
public:
ListenerImpl(Event::DispatcherImpl& dispatcher, Socket& socket, ListenerCallbacks& cb,
bool bind_to_port, bool hand_off_restored_destination_connections);

void disable();
void enable();
void disable() override;
void enable() override;

protected:
virtual Address::InstanceConstSharedPtr getLocalAddress(int fd);
void setupServerSocket(Event::DispatcherImpl& dispatcher, Socket& socket);

Address::InstanceConstSharedPtr local_address_;
ListenerCallbacks& cb_;
const bool hand_off_restored_destination_connections_;

private:
static void errorCallback(evconnlistener* listener, void* context);
static void listenCallback(evconnlistener*, evutil_socket_t fd, sockaddr* remote_addr,
int remote_addr_len, void* arg);
static void errorCallback(evconnlistener* listener, void* context);

Event::Libevent::ListenerPtr listener_;
};
Expand Down
Loading

0 comments on commit 0ce6578

Please sign in to comment.