Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introducing UDP listener. #5473

Merged
merged 32 commits into from
Jan 30, 2019
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
0509ee3
Introducing UDP listener.
conqerAtapple Jan 4, 2019
eca712d
Removed MSG_CONFIRM flag for sendto method.
conqerAtapple Jan 4, 2019
37232e4
Added more tests for UDP listener.
conqerAtapple Jan 4, 2019
3c7e6f6
Fixed recvfrom arguments.
conqerAtapple Jan 4, 2019
6389f68
Taking care of -EAGAIN.
conqerAtapple Jan 4, 2019
bd240a5
Added support for mock buffer.
conqerAtapple Jan 4, 2019
3a973f1
Added support for onError callback.
conqerAtapple Jan 5, 2019
6803aee
Removed move operator.
conqerAtapple Jan 5, 2019
eed7d49
Addressed some review comments.
conqerAtapple Jan 10, 2019
0bcfbb8
Added `onWriteReady` implementation.
conqerAtapple Jan 11, 2019
a1afecf
Added echo test for UDP listener.
conqerAtapple Jan 11, 2019
c375b54
Added recvFrom in UdpListener.
conqerAtapple Jan 12, 2019
8e49e6b
Removed recvFrom from Buffer.
conqerAtapple Jan 12, 2019
0749e32
Fix tests.
conqerAtapple Jan 12, 2019
7ae3884
Added UdpData structure.
conqerAtapple Jan 13, 2019
4c25d18
s/createUdpListener/createDatagramListener
conqerAtapple Jan 13, 2019
e85347d
Adding some comments for ideas to be discussed.
conqerAtapple Jan 14, 2019
17ba123
Moved udp tests to its own file.
conqerAtapple Jan 16, 2019
cd8930e
Parameter names.
conqerAtapple Jan 16, 2019
f128673
Added more comments for things to discuss.
conqerAtapple Jan 17, 2019
b83ff51
Added TODO for reconsidering ownership semantics.
conqerAtapple Jan 18, 2019
e8003b2
Fixed onError API parameter names.
conqerAtapple Jan 24, 2019
84b6e68
Code review addressed (@mpwarres).
conqerAtapple Jan 24, 2019
a84a96d
Fixed merge issues.
conqerAtapple Jan 25, 2019
ee78778
Fix format.
conqerAtapple Jan 25, 2019
9bcfacf
Fixed EAGAIN handling.
conqerAtapple Jan 26, 2019
4f8de99
Fixed asserts.
conqerAtapple Jan 26, 2019
bc2695a
fixed typo.
conqerAtapple Jan 26, 2019
3c1f89a
Changed to FileEvent.
conqerAtapple Jan 27, 2019
ce18742
Removed extraneous parenthesis.
conqerAtapple Jan 27, 2019
972819a
Removed const'ness for Dispatcher in listener.
conqerAtapple Jan 27, 2019
3de381d
Removed const Dispatcher method.
conqerAtapple Jan 29, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion include/envoy/event/dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class Dispatcher {
* initially listen on.
*/
virtual FileEventPtr createFileEvent(int fd, FileReadyCb cb, FileTriggerType trigger,
uint32_t events) PURE;
uint32_t events) const PURE;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are somewhat splitting hairs here, but IMO this is not a const operation on the dispatcher, it does modify its state. Instead of changing this to a const method can you pass a non-const Dispatcher to where you need it? This is what is done elsewhere.


/**
* @return Filesystem::WatcherPtr a filesystem watcher owned by the caller.
Expand All @@ -110,6 +110,14 @@ class Dispatcher {
Network::ListenerCallbacks& cb, bool bind_to_port,
bool hand_off_restored_destination_connections) PURE;

/**
* Create a logical udp listener on a specific port.
* @param socket supplies the socket to listen on.
* @param cb supplies the udp listener callbacks to invoke for listener events.
* @return Network::ListenerPtr a new listener that is owned by the caller.
*/
virtual Network::ListenerPtr createUdpListener(Network::Socket& socket,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering whether we will ever want to support proxying datagrams over Unix domain sockets, in addition to over UDP. (Analogous to current Envoy support for pipes vs. TCP connections.) If so, then it might be more general to name this createDatagramListener. Or maybe UDS datagrams aren't worth supporting, in which case current name SGTM.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mpwarres Does gQUIC currently use Unix domain sockets for proxying datagrams? I vaguely recall hearing about a potential experiment with them in a QUIC talk.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not that I'm aware. We have used SOCK_DGRAM + shared memory to allow GFEs (speaking QUIC) to efficiently front for other (non QUIC speaking) servers, but AIUI there the datagrams were just for coordinating shared memory use, not for carrying QUIC traffic. Not sure if @ianswett may have mentioned this in one of his talks, perhaps this one, though I don't see it in the slides. The comment I posted upthread was more hypothetical--I don't have a concrete use case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong opinion on this, though createDatagramListener() does seem more generic.

Network::UdpListenerCallbacks& cb) PURE;
/**
* Allocate a timer. @see Timer for docs on how to use the timer.
* @param cb supplies the callback to invoke when the timer fires.
Expand Down
53 changes: 53 additions & 0 deletions include/envoy/network/listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,59 @@ class ListenerCallbacks {
virtual void onNewConnection(ConnectionPtr&& new_connection) PURE;
};

/**
* Utility struct that encapsulates the information from a udp socket's
* recvfrom/recvmmsg call.
*
* TODO(conqerAtapple): Maybe this belongs inside the UdpListenerCallbacks
* class.
*/
struct UdpData {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optional naming suggestion (feel free to disregard): UdpPacket

Address::InstanceConstSharedPtr local_address_;
Address::InstanceConstSharedPtr peer_address_; // TODO(conquerAtapple): Fix ownership semantics.
Buffer::InstancePtr buffer_;
// TODO(conquerAtapple):
// Add UdpReader here so that the callback handler can
// then use the reader to do multiple reads(recvmmsg) once the OS notifies it
// has data. We could also just return a `ReaderFactory` that returns either a
// `recvfrom` reader (with peer information) or a `read/recvmmsg` reader. This
// is still being flushed out (Jan, 2019).
};

/**
* Udp listener callbacks.
*/
class UdpListenerCallbacks {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought Matt requested on the prior PR that we try to stick to listenercallbacks where possible?

I'd think the current code could be a subclass of ListenerCallbacks, where onAccept was commeted as "accepting new data" rather than "accepting a new connection", we call onNewConnection where we currently do (but with a UdpConnection subclass which may be filled with TODOs, but we pass the local address and the peer address to it) and then hand onData calls to the new connection?

Copy link
Contributor Author

@conqerAtapple conqerAtapple Jan 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above. The current ListenerCallbacks interface exposes connection concept. Inheriting that interface would semantically mean - UdpListenerCallbacks is-a ListenerCallbacks. Not sure if we mean that. What about having a bridge that bridges the UDP listener to the existing ListenerCallbacks? This bridge could be a concrete implementation of a UdpListenerCallbacks interface. Thinking out loud here. Would he happy to hear what you think.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comments below.

public:
enum class ErrorCode { SyscallError, UnknownError };

virtual ~UdpListenerCallbacks() = default;

/**
* Called whenever data is received by the underlying udp socket.
*
* @param data UdpData from the underlying socket.
*/
virtual void onData(const UdpData& data) PURE;

/**
* Called when the underlying socket is ready for write.
*
* @param socket Underlying server socket for the listener.
*
* TODO(conqerAtapple): Maybe we need a UdpWriter here instead of Socket.
*/
virtual void onWriteReady(const Socket& socket) PURE;

/**
* Called when there is an error event.
*
* @param error_code ErrorCode for the error event.
* @param error_number System error number.
*/
virtual void onError(const ErrorCode& error_code, int error_number) PURE;
};

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the listener might also need to be notified when the UDP socket is writable? AIUI, in the TCP case, this interaction is a little hidden: ListenerCallbacks::onNewConnection() provides a Network::Connection, and that Connection is set up with the Dispatcher to receive events when the connection socket is readable or writable: link. The "is readable" transition is already covered by UdpListenerCallbacks::onData(), but seems like there will need to be something somewhere for the "is writable" side, to let the receipient know when it can flush/write more packets.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. So I guess we need another callback onWriteReady ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it depends on the UdpListenerCallbacks vs. ListenerCallbacks question. If you go with the already existing ListenerCallbacks interface like @alyssawilk is suggesting, then it would make sense to reflect writability inside the Connection implementation, i.e. Event::FileReadyType::Write -> UdpConnectionImpl::onWriteReady() which could flush any buffered packets that are waiting to be written, paired with a UdpConnectionImpl::write() implementation that buffers packets when the socket is not currently writable.

If you go with UdpListenerCallbacks, then yeah, I guess there would need to be something like an onWriteReady(), and it would need to supply something to write to (either the Socket, or an interface for writing a packet).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how to reuse existing ListenerCallbacks for UDP since it has connection concepts. I am leaning towards having onWriteReady in UdpListener. Let me know if you (@alyssawilk) feel strongly about this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is related to my comment above about whether we are doing connection oriented or not. IMO I'm leaning towads splitting UDP into 2 types: 1) What you have now, which is geared towards datagram, and 2) One listener type which uses normal listener callbacks, and spits out connections which then supports existing filters. Thoughts?

/**
* An abstract socket listener. Free the listener to stop listening on the socket.
*/
Expand Down
9 changes: 8 additions & 1 deletion source/common/event/dispatcher_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "common/network/connection_impl.h"
#include "common/network/dns_impl.h"
#include "common/network/listener_impl.h"
#include "common/network/udp_listener_impl.h"

#include "event2/event.h"

Expand Down Expand Up @@ -101,7 +102,7 @@ Network::DnsResolverSharedPtr DispatcherImpl::createDnsResolver(
}

FileEventPtr DispatcherImpl::createFileEvent(int fd, FileReadyCb cb, FileTriggerType trigger,
uint32_t events) {
uint32_t events) const {
ASSERT(isThreadSafe());
return FileEventPtr{new FileEventImpl(*this, fd, cb, trigger, events)};
}
Expand All @@ -119,6 +120,12 @@ DispatcherImpl::createListener(Network::Socket& socket, Network::ListenerCallbac
hand_off_restored_destination_connections)};
}

Network::ListenerPtr DispatcherImpl::createUdpListener(Network::Socket& socket,
Network::UdpListenerCallbacks& cb) {
ASSERT(isThreadSafe());
return Network::ListenerPtr{new Network::UdpListenerImpl(*this, socket, cb)};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: std::make_unique<>

}

TimerPtr DispatcherImpl::createTimer(TimerCb cb) {
ASSERT(isThreadSafe());
return scheduler_->createTimer(cb);
Expand Down
6 changes: 4 additions & 2 deletions source/common/event/dispatcher_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class DispatcherImpl : Logger::Loggable<Logger::Id::main>, public Dispatcher {
/**
* @return event_base& the libevent base.
*/
event_base& base() { return *base_; }
event_base& base() const { return *base_; }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you revert this const change also and a couple of the other ones you added?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will remove them but I think they should be const :)


// Event::Dispatcher
TimeSystem& timeSystem() override { return time_system_; }
Expand All @@ -46,11 +46,13 @@ class DispatcherImpl : Logger::Loggable<Logger::Id::main>, public Dispatcher {
Network::DnsResolverSharedPtr createDnsResolver(
const std::vector<Network::Address::InstanceConstSharedPtr>& resolvers) override;
FileEventPtr createFileEvent(int fd, FileReadyCb cb, FileTriggerType trigger,
uint32_t events) override;
uint32_t events) const override;
Filesystem::WatcherPtr createFilesystemWatcher() override;
Network::ListenerPtr createListener(Network::Socket& socket, Network::ListenerCallbacks& cb,
bool bind_to_port,
bool hand_off_restored_destination_connections) override;
Network::ListenerPtr createUdpListener(Network::Socket& socket,
Network::UdpListenerCallbacks& cb) override;
TimerPtr createTimer(TimerCb cb) override;
void deferredDelete(DeferredDeletablePtr&& to_delete) override;
void exit() override;
Expand Down
2 changes: 1 addition & 1 deletion source/common/event/file_event_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
namespace Envoy {
namespace Event {

FileEventImpl::FileEventImpl(DispatcherImpl& dispatcher, int fd, FileReadyCb cb,
FileEventImpl::FileEventImpl(const DispatcherImpl& dispatcher, int fd, FileReadyCb cb,
FileTriggerType trigger, uint32_t events)
: cb_(cb), base_(&dispatcher.base()), fd_(fd), trigger_(trigger) {
assignEvents(events);
Expand Down
2 changes: 1 addition & 1 deletion source/common/event/file_event_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace Event {
*/
class FileEventImpl : public FileEvent, ImplBase {
public:
FileEventImpl(DispatcherImpl& dispatcher, int fd, FileReadyCb cb, FileTriggerType trigger,
FileEventImpl(const DispatcherImpl& dispatcher, int fd, FileReadyCb cb, FileTriggerType trigger,
uint32_t events);

// Event::FileEvent
Expand Down
9 changes: 8 additions & 1 deletion source/common/network/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,15 @@ envoy_cc_library(
envoy_cc_library(
name = "listener_lib",
srcs = [
"base_listener_impl.cc",
"listener_impl.cc",
"udp_listener_impl.cc",
],
hdrs = [
"base_listener_impl.h",
"listener_impl.h",
"udp_listener_impl.h",
],
hdrs = ["listener_impl.h"],
deps = [
":address_lib",
":io_socket_handle_lib",
Expand All @@ -151,6 +157,7 @@ envoy_cc_library(
"//include/envoy/network:listener_interface",
"//include/envoy/stats:stats_interface",
"//include/envoy/stats:stats_macros",
"//source/common/buffer:buffer_lib",
"//source/common/common:assert_lib",
"//source/common/common:empty_string",
"//source/common/common:linked_object",
Expand Down
35 changes: 35 additions & 0 deletions source/common/network/base_listener_impl.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#include "common/network/base_listener_impl.h"

#include <sys/un.h>

#include "envoy/common/exception.h"

#include "common/common/assert.h"
#include "common/common/empty_string.h"
#include "common/common/fmt.h"
#include "common/event/dispatcher_impl.h"
#include "common/event/file_event_impl.h"
#include "common/network/address_impl.h"

#include "event2/listener.h"

namespace Envoy {
namespace Network {

Address::InstanceConstSharedPtr BaseListenerImpl::getLocalAddress(int fd) {
return Address::addressFromFd(fd);
}

BaseListenerImpl::BaseListenerImpl(const Event::DispatcherImpl& dispatcher, Socket& socket)
: local_address_(nullptr), dispatcher_(dispatcher), socket_(socket) {
const auto ip = socket.localAddress()->ip();

// Only use the listen socket's local address for new connections if it is not the all hosts
// address (e.g., 0.0.0.0 for IPv4).
if (!(ip && ip->isAnyAddress())) {
local_address_ = socket.localAddress();
}
}

} // namespace Network
} // namespace Envoy
30 changes: 30 additions & 0 deletions source/common/network/base_listener_impl.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#pragma once

#include "envoy/network/listener.h"

#include "common/event/dispatcher_impl.h"
#include "common/event/libevent.h"
#include "common/network/listen_socket_impl.h"

#include "event2/event.h"

namespace Envoy {
namespace Network {

/**
* Base libevent implementation of Network::Listener.
*/
class BaseListenerImpl : public Listener {
public:
BaseListenerImpl(const Event::DispatcherImpl& dispatcher, Socket& socket);

protected:
virtual Address::InstanceConstSharedPtr getLocalAddress(int fd);

Address::InstanceConstSharedPtr local_address_;
const Event::DispatcherImpl& dispatcher_;
Socket& socket_;
};

} // namespace Network
} // namespace Envoy
49 changes: 21 additions & 28 deletions source/common/network/listener_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@
namespace Envoy {
namespace Network {

Address::InstanceConstSharedPtr ListenerImpl::getLocalAddress(int fd) {
return Address::addressFromFd(fd);
}

void ListenerImpl::listenCallback(evconnlistener*, evutil_socket_t fd, sockaddr* remote_addr,
int remote_addr_len, void* arg) {
ListenerImpl* listener = static_cast<ListenerImpl*>(arg);
Expand Down Expand Up @@ -51,35 +47,32 @@ void ListenerImpl::listenCallback(evconnlistener*, evutil_socket_t fd, sockaddr*
listener->hand_off_restored_destination_connections_);
}

ListenerImpl::ListenerImpl(Event::DispatcherImpl& dispatcher, Socket& socket, ListenerCallbacks& cb,
bool bind_to_port, bool hand_off_restored_destination_connections)
: local_address_(nullptr), cb_(cb),
hand_off_restored_destination_connections_(hand_off_restored_destination_connections),
listener_(nullptr) {
const auto ip = socket.localAddress()->ip();
void ListenerImpl::setupServerSocket(const Event::DispatcherImpl& dispatcher, Socket& socket) {
listener_.reset(
evconnlistener_new(&dispatcher.base(), listenCallback, this, 0, -1, socket.ioHandle().fd()));

// Only use the listen socket's local address for new connections if it is not the all hosts
// address (e.g., 0.0.0.0 for IPv4).
if (!(ip && ip->isAnyAddress())) {
local_address_ = socket.localAddress();
if (!listener_) {
throw CreateListenerException(
fmt::format("cannot listen on socket: {}", socket.localAddress()->asString()));
}

if (bind_to_port) {
listener_.reset(evconnlistener_new(&dispatcher.base(), listenCallback, this, 0, -1,
socket.ioHandle().fd()));

if (!listener_) {
throw CreateListenerException(
fmt::format("cannot listen on socket: {}", socket.localAddress()->asString()));
}
if (!Network::Socket::applyOptions(socket.options(), socket,
envoy::api::v2::core::SocketOption::STATE_LISTENING)) {
throw CreateListenerException(fmt::format("cannot set post-listen socket option on socket: {}",
socket.localAddress()->asString()));
}

if (!Network::Socket::applyOptions(socket.options(), socket,
envoy::api::v2::core::SocketOption::STATE_LISTENING)) {
throw CreateListenerException(fmt::format(
"cannot set post-listen socket option on socket: {}", socket.localAddress()->asString()));
}
evconnlistener_set_error_cb(listener_.get(), errorCallback);
}

evconnlistener_set_error_cb(listener_.get(), errorCallback);
ListenerImpl::ListenerImpl(const Event::DispatcherImpl& dispatcher, Socket& socket,
ListenerCallbacks& cb, bool bind_to_port,
bool hand_off_restored_destination_connections)
: BaseListenerImpl(dispatcher, socket), cb_(cb),
hand_off_restored_destination_connections_(hand_off_restored_destination_connections),
listener_(nullptr) {
if (bind_to_port) {
setupServerSocket(dispatcher, socket);
}
}

Expand Down
24 changes: 9 additions & 15 deletions source/common/network/listener_impl.h
Original file line number Diff line number Diff line change
@@ -1,38 +1,32 @@
#pragma once

#include "envoy/network/listener.h"

#include "common/event/dispatcher_impl.h"
#include "common/event/libevent.h"
#include "common/network/listen_socket_impl.h"

#include "event2/event.h"
#include "base_listener_impl.h"

namespace Envoy {
namespace Network {

/**
* libevent implementation of Network::Listener.
* libevent implementation of Network::Listener for TCP.
* TODO(conqerAtapple): Consider renaming the class to `TcpListenerImpl`.
*/
class ListenerImpl : public Listener {
class ListenerImpl : public BaseListenerImpl {
public:
ListenerImpl(Event::DispatcherImpl& dispatcher, Socket& socket, ListenerCallbacks& cb,
ListenerImpl(const Event::DispatcherImpl& dispatcher, Socket& socket, ListenerCallbacks& cb,
bool bind_to_port, bool hand_off_restored_destination_connections);

void disable();
void enable();
void disable() override;
void enable() override;

protected:
virtual Address::InstanceConstSharedPtr getLocalAddress(int fd);
void setupServerSocket(const Event::DispatcherImpl& dispatcher, Socket& socket);

Address::InstanceConstSharedPtr local_address_;
ListenerCallbacks& cb_;
const bool hand_off_restored_destination_connections_;

private:
static void errorCallback(evconnlistener* listener, void* context);
static void listenCallback(evconnlistener*, evutil_socket_t fd, sockaddr* remote_addr,
int remote_addr_len, void* arg);
static void errorCallback(evconnlistener* listener, void* context);

Event::Libevent::ListenerPtr listener_;
};
Expand Down
Loading