Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

udp_proxy: scaffolding #8883

Merged
merged 14 commits into from
Nov 12, 2019
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,4 @@ extensions/filters/common/original_src @snowp @klarose
/*/extensions/filters/network/ext_authz @gsagula @dio
/*/extensions/filters/network/tcp_proxy @alyssawilk @zuercher
/*/extensions/filters/network/echo @htuch @alyssawilk
/*/extensions/filters/udp/udp_proxy @mattklein123 @danzh2010
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ message TcpProxy {
option (validate.required) = true;

// The upstream cluster to connect to.
//
string cluster = 2;

// Multiple upstream clusters can be specified for a given route. The
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ message TcpProxy {
option (validate.required) = true;

// The upstream cluster to connect to.
//
string cluster = 2;

// Multiple upstream clusters can be specified for a given route. The
Expand Down
7 changes: 7 additions & 0 deletions api/envoy/config/filter/udp/udp_proxy/v2alpha/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# DO NOT EDIT. This file is generated by tools/proto_sync.py.

load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package")

licenses(["notice"]) # Apache 2

api_proto_package()
22 changes: 22 additions & 0 deletions api/envoy/config/filter/udp/udp_proxy/v2alpha/udp_proxy.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
syntax = "proto3";

package envoy.config.filter.udp.udp_proxy.v2alpha;

option java_outer_classname = "UdpProxyProto";
option java_multiple_files = true;
option java_package = "io.envoyproxy.envoy.config.filter.udp.udp_proxy.v2alpha";

import "google/protobuf/duration.proto";

import "validate/validate.proto";

// TODO(mattklein123): docs

message UdpProxyConfig {
oneof cluster_specifier {
option (validate.required) = true;

// The upstream cluster to connect to.
string cluster = 1 [(validate.rules).string = {min_bytes: 1}];
mattklein123 marked this conversation as resolved.
Show resolved Hide resolved
}
}
3 changes: 1 addition & 2 deletions include/envoy/network/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -436,9 +436,8 @@ class FilterChainFactory {
*
* @param udp_listener supplies the listener to create the chain on.
* @param callbacks supplies the callbacks needed to create a filter.
* @return true if filter chain was created successfully. Otherwise false.
*/
virtual bool createUdpListenerFilterChain(UdpListenerFilterManager& udp_listener,
virtual void createUdpListenerFilterChain(UdpListenerFilterManager& udp_listener,
UdpReadFilterCallbacks& callbacks) PURE;
};

Expand Down
30 changes: 18 additions & 12 deletions include/envoy/network/listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,22 +127,28 @@ class ListenerCallbacks {
/**
* Utility struct that encapsulates the information from a udp socket's
* recvfrom/recvmmsg call.
*
* TODO(conqerAtapple): Maybe this belongs inside the UdpListenerCallbacks
* class.
*/
struct UdpRecvData {
Address::InstanceConstSharedPtr local_address_;
Address::InstanceConstSharedPtr peer_address_; // TODO(conquerAtapple): Fix ownership semantics.
struct LocalPeerAddresses {
bool operator==(const LocalPeerAddresses& rhs) const {
// TODO(mattklein123): Implement a hash directly on Address that does not use strings.
return local_->asStringView() == rhs.local_->asStringView() &&
peer_->asStringView() == rhs.peer_->asStringView();
}

template <typename H> friend H AbslHashValue(H h, const LocalPeerAddresses& addresses) {
// TODO(mattklein123): Implement a hash directly on Address that does not use strings.
return H::combine(std::move(h), addresses.local_->asStringView(),
addresses.peer_->asStringView());
}

Address::InstanceConstSharedPtr local_;
Address::InstanceConstSharedPtr peer_;
};

LocalPeerAddresses addresses_;
Buffer::InstancePtr buffer_;
MonotonicTime receive_time_;

// TODO(conquerAtapple):
// Add UdpReader here so that the callback handler can
// then use the reader to do multiple reads(recvmmsg) once the OS notifies it
// has data. We could also just return a `ReaderFactory` that returns either a
// `recvfrom` reader (with peer information) or a `read/recvmmsg` reader. This
// is still being flushed out (Jan, 2019).
};

/**
Expand Down
1 change: 1 addition & 0 deletions source/common/network/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ envoy_cc_library(
deps = [
":address_lib",
"//include/envoy/network:connection_interface",
"//include/envoy/network:listener_interface",
"//include/envoy/stats:stats_interface",
"//source/common/api:os_sys_calls_lib",
"//source/common/buffer:buffer_lib",
Expand Down
57 changes: 14 additions & 43 deletions source/common/network/udp_listener_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,50 +68,24 @@ void UdpListenerImpl::onSocketEvent(short flags) {

void UdpListenerImpl::handleReadCallback() {
ENVOY_UDP_LOG(trace, "handleReadCallback");
do {
uint32_t old_packets_dropped = packets_dropped_;
MonotonicTime receive_time = time_source_.monotonicTime();
Api::IoCallUint64Result result =
Utility::readFromSocket(socket_, *this, receive_time, &packets_dropped_);

if (!result.ok()) {
// No more to read or encountered a system error.
if (result.err_->getErrorCode() != Api::IoError::IoErrorCode::Again) {
ENVOY_UDP_LOG(error, "recvmsg result {}: {}", static_cast<int>(result.err_->getErrorCode()),
result.err_->getErrorDetails());
cb_.onReceiveError(UdpListenerCallbacks::ErrorCode::SyscallError,
result.err_->getErrorCode());
}
// Stop reading.
return;
}

if (result.rc_ == 0) {
// TODO(conqerAtapple): Is zero length packet interesting? If so add stats
// for it. Otherwise remove the warning log below.
ENVOY_UDP_LOG(trace, "received 0-length packet");
}

if (packets_dropped_ != old_packets_dropped) {
// The kernel tracks SO_RXQ_OVFL as a uint32 which can overflow to a smaller
// value. So as long as this count differs from previously recorded value,
// more packets are dropped by kernel.
uint32_t delta = (packets_dropped_ > old_packets_dropped)
? (packets_dropped_ - old_packets_dropped)
: (packets_dropped_ +
(std::numeric_limits<uint32_t>::max() - old_packets_dropped) + 1);
// TODO(danzh) add stats for this.
ENVOY_UDP_LOG(debug, "Kernel dropped {} more packets. Consider increase receive buffer size.",
delta);
}
} while (true);
const Api::IoErrorPtr result = Utility::readPacketsFromSocket(
socket_.ioHandle(), *socket_.localAddress(), *this, time_source_, packets_dropped_);
// TODO(mattklein123): Handle no error when we limit the number of packets read.
if (result->getErrorCode() != Api::IoError::IoErrorCode::Again) {
ENVOY_UDP_LOG(error, "recvmsg result {}: {}", static_cast<int>(result->getErrorCode()),
result->getErrorDetails());
cb_.onReceiveError(UdpListenerCallbacks::ErrorCode::SyscallError, result->getErrorCode());
}
}

void UdpListenerImpl::processPacket(Address::InstanceConstSharedPtr local_address,
Address::InstanceConstSharedPtr peer_address,
Buffer::InstancePtr buffer, MonotonicTime receive_time) {
UdpRecvData recvData{std::move(local_address), std::move(peer_address), std::move(buffer),
receive_time};
// UDP listeners are always configured with the socket option that allows pulling the local
// address. This should never be null.
ASSERT(local_address != nullptr);
UdpRecvData recvData{
{std::move(local_address), std::move(peer_address)}, std::move(buffer), receive_time};
cb_.onData(recvData);
}

Expand All @@ -129,11 +103,8 @@ const Address::InstanceConstSharedPtr& UdpListenerImpl::localAddress() const {
Api::IoCallUint64Result UdpListenerImpl::send(const UdpSendData& send_data) {
ENVOY_UDP_LOG(trace, "send");
Buffer::Instance& buffer = send_data.buffer_;
uint64_t num_slices = buffer.getRawSlices(nullptr, 0);
STACK_ARRAY(slices, Buffer::RawSlice, num_slices);
buffer.getRawSlices(slices.begin(), num_slices);
Api::IoCallUint64Result send_result = Utility::writeToSocket(
socket_, slices.begin(), num_slices, send_data.local_ip_, send_data.peer_address_);
socket_.ioHandle(), buffer, send_data.local_ip_, send_data.peer_address_);

// The send_result normalizes the rc_ value to 0 in error conditions.
// The drain call is hence 'safe' in success and failure cases.
Expand Down
81 changes: 63 additions & 18 deletions source/common/network/utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "common/common/assert.h"
#include "common/common/cleanup.h"
#include "common/common/fmt.h"
#include "common/common/stack_array.h"
#include "common/common/utility.h"
#include "common/network/address_impl.h"
#include "common/network/io_socket_error_impl.h"
Expand Down Expand Up @@ -230,14 +231,13 @@ Address::InstanceConstSharedPtr Utility::getLocalAddress(const Address::IpVersio
return ret;
}

bool Utility::isLocalConnection(const Network::ConnectionSocket& socket) {
bool Utility::isLocalConnection(const ConnectionSocket& socket) {
// These are local:
// - Pipes
// - Sockets to a loopback address
// - Sockets where the local and remote address (ignoring port) are the same
const auto& remote_address = socket.remoteAddress();
if (remote_address->type() == Envoy::Network::Address::Type::Pipe ||
isLoopbackAddress(*remote_address)) {
if (remote_address->type() == Address::Type::Pipe || isLoopbackAddress(*remote_address)) {
return true;
}
const auto local_ip = socket.localAddress()->ip();
Expand Down Expand Up @@ -329,9 +329,9 @@ const std::string& Utility::getIpv6CidrCatchAllAddress() {
Address::InstanceConstSharedPtr Utility::getAddressWithPort(const Address::Instance& address,
uint32_t port) {
switch (address.ip()->version()) {
case Network::Address::IpVersion::v4:
case Address::IpVersion::v4:
return std::make_shared<Address::Ipv4Instance>(address.ip()->addressAsString(), port);
case Network::Address::IpVersion::v6:
case Address::IpVersion::v6:
return std::make_shared<Address::Ipv6Instance>(address.ip()->addressAsString(), port);
}
NOT_REACHED_GCOVR_EXCL_LINE;
Expand Down Expand Up @@ -413,7 +413,7 @@ bool Utility::portInRangeList(const Address::Instance& address, const std::list<
return false;
}

for (const Network::PortRange& p : list) {
for (const PortRange& p : list) {
if (p.contains(address.ip()->port())) {
return true;
}
Expand Down Expand Up @@ -450,9 +450,9 @@ Address::InstanceConstSharedPtr
Utility::protobufAddressToAddress(const envoy::api::v2::core::Address& proto_address) {
switch (proto_address.address_case()) {
case envoy::api::v2::core::Address::kSocketAddress:
return Network::Utility::parseInternetAddress(proto_address.socket_address().address(),
proto_address.socket_address().port_value(),
!proto_address.socket_address().ipv4_compat());
return Utility::parseInternetAddress(proto_address.socket_address().address(),
proto_address.socket_address().port_value(),
!proto_address.socket_address().ipv4_compat());
case envoy::api::v2::core::Address::kPipe:
return std::make_shared<Address::PipeInstance>(proto_address.pipe().path());
default:
Expand Down Expand Up @@ -493,13 +493,22 @@ Utility::protobufAddressSocketType(const envoy::api::v2::core::Address& proto_ad
}
}

Api::IoCallUint64Result Utility::writeToSocket(Network::Socket& socket, Buffer::RawSlice* slices,
Api::IoCallUint64Result Utility::writeToSocket(IoHandle& handle, const Buffer::Instance& buffer,
const Address::Ip* local_ip,
const Address::Instance& peer_address) {
const uint64_t num_slices = buffer.getRawSlices(nullptr, 0);
STACK_ARRAY(slices, Buffer::RawSlice, num_slices);
buffer.getRawSlices(slices.begin(), num_slices);
return writeToSocket(handle, slices.begin(), num_slices, local_ip, peer_address);
}

Api::IoCallUint64Result Utility::writeToSocket(IoHandle& handle, Buffer::RawSlice* slices,
uint64_t num_slices, const Address::Ip* local_ip,
const Address::Instance& peer_address) {
Api::IoCallUint64Result send_result(
/*rc=*/0, /*err=*/Api::IoErrorPtr(nullptr, Network::IoSocketError::deleteIoError));
/*rc=*/0, /*err=*/Api::IoErrorPtr(nullptr, IoSocketError::deleteIoError));
do {
send_result = socket.ioHandle().sendmsg(slices, num_slices, 0, local_ip, peer_address);
send_result = handle.sendmsg(slices, num_slices, 0, local_ip, peer_address);
} while (!send_result.ok() &&
// Send again if interrupted.
send_result.err_->getErrorCode() == Api::IoError::IoErrorCode::Interrupt);
Expand All @@ -514,7 +523,8 @@ Api::IoCallUint64Result Utility::writeToSocket(Network::Socket& socket, Buffer::
return send_result;
}

Api::IoCallUint64Result Utility::readFromSocket(Network::Socket& socket,
Api::IoCallUint64Result Utility::readFromSocket(IoHandle& handle,
const Address::Instance& local_address,
UdpPacketProcessor& udp_packet_processor,
MonotonicTime receive_time,
uint32_t* packets_dropped) {
Expand All @@ -525,14 +535,12 @@ Api::IoCallUint64Result Utility::readFromSocket(Network::Socket& socket,

IoHandle::RecvMsgOutput output(packets_dropped);
Api::IoCallUint64Result result =
socket.ioHandle().recvmsg(&slice, num_slices, socket.localAddress()->ip()->port(), output);
handle.recvmsg(&slice, num_slices, local_address.ip()->port(), output);

if (!result.ok()) {
return result;
}

RELEASE_ASSERT(output.local_address_ != nullptr, "fail to get local address from IP header");

// Adjust used memory length.
slice.len_ = std::min(slice.len_, static_cast<size_t>(result.rc_));
buffer->commit(&slice, 1);
Expand All @@ -541,19 +549,56 @@ Api::IoCallUint64Result Utility::readFromSocket(Network::Socket& socket,

RELEASE_ASSERT(output.peer_address_ != nullptr,
fmt::format("Unable to get remote address for fd: {}, local address: {} ",
socket.ioHandle().fd(), socket.localAddress()->asString()));
handle.fd(), local_address.asString()));

// Unix domain sockets are not supported
RELEASE_ASSERT(output.peer_address_->type() == Address::Type::Ip,
fmt::format("Unsupported remote address: {} local address: {}, receive size: "
"{}",
output.peer_address_->asString(), socket.localAddress()->asString(),
output.peer_address_->asString(), local_address.asString(),
result.rc_));
udp_packet_processor.processPacket(std::move(output.local_address_),
std::move(output.peer_address_), std::move(buffer),
receive_time);
return result;
}

Api::IoErrorPtr Utility::readPacketsFromSocket(IoHandle& handle,
const Address::Instance& local_address,
UdpPacketProcessor& udp_packet_processor,
TimeSource& time_source, uint32_t& packets_dropped) {
do {
const uint32_t old_packets_dropped = packets_dropped;
const MonotonicTime receive_time = time_source.monotonicTime();
Api::IoCallUint64Result result = Utility::readFromSocket(
handle, local_address, udp_packet_processor, receive_time, &packets_dropped);

if (!result.ok()) {
// No more to read or encountered a system error.
return std::move(result.err_);
}

if (result.rc_ == 0) {
// TODO(conqerAtapple): Is zero length packet interesting? If so add stats
// for it. Otherwise remove the warning log below.
ENVOY_LOG_MISC(trace, "received 0-length packet");
}

if (packets_dropped != old_packets_dropped) {
// The kernel tracks SO_RXQ_OVFL as a uint32 which can overflow to a smaller
// value. So as long as this count differs from previously recorded value,
// more packets are dropped by kernel.
const uint32_t delta =
(packets_dropped > old_packets_dropped)
? (packets_dropped - old_packets_dropped)
: (packets_dropped + (std::numeric_limits<uint32_t>::max() - old_packets_dropped) +
1);
// TODO(danzh) add stats for this.
ENVOY_LOG_MISC(
debug, "Kernel dropped {} more packets. Consider increase receive buffer size.", delta);
}
} while (true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mattklein123 this loop runs until we run out of packets; do you think we should have some bound on the loop to ensure fairness across epoll events? This may be a possible DoS vector once UDP becomes a thing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes @danzh2010 and I have already discussed thi and there are various TODOs in the code. Someone will be doing this as a follow up once the main support for UDP and QUIC merges.

}

} // namespace Network
} // namespace Envoy
Loading