Skip to content

Commit

Permalink
Add routing capabilities to tcp_proxy
Browse files Browse the repository at this point in the history
Allows the tcp_proxy filter to pick the destination cluster based
on a combination of L4 connection parameters (source/destination
IP address/port)

See envoyproxy#345
  • Loading branch information
Enrico Schiattarella committed Jan 25, 2017
1 parent d4b80ac commit d833ec8
Show file tree
Hide file tree
Showing 23 changed files with 614 additions and 67 deletions.
10 changes: 8 additions & 2 deletions configs/envoy_service_to_service.template.json
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,14 @@
"type": "read",
"name": "tcp_proxy",
"config": {
"cluster": "mongo_{{ key }}",
"stat_prefix": "mongo_{{ key }}"
"stat_prefix": "mongo_{{ key }}",
"route_config": {
"routes": [
{
"cluster": "mongo_{{ key }}"
}
]
}
}
}]
}{% if not loop.last %},{% endif -%}
Expand Down
14 changes: 9 additions & 5 deletions docs/configuration/network_filters/tcp_proxy_filter.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,13 @@ TCP proxy :ref:`architecture overview <arch_overview_tcp_proxy>`.
"type": "read",
"name": "tcp_proxy",
"config": {
"cluster": "...",
"stat_prefix": "..."
"stat_prefix": "...",
"route_config": "{...}"
}
}
cluster
*(required, string)* The :ref:`cluster manager <arch_overview_cluster_manager>` cluster to connect
to when a new downstream network connection is received.
:ref:`route_config <config_network_filters_tcp_proxy_route_config>`
*(required, object)* The route table for the filter. All filter instances must have a route table, even if it is empty.

stat_prefix
*(required, string)* The prefix to use when emitting :ref:`statistics
Expand All @@ -39,3 +38,8 @@ statistics are rooted at *tcp.<stat_prefix>.* with the following statistics:

downstream_cx_tx_bytes_total, Counter, Total bytes written to the downstream connection.
downstream_cx_tx_bytes_buffered, Gauge, Total bytes currently buffered to the downstream connection.

.. toctree::
:hidden:

tcp_proxy_filter_route_config
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
.. _config_network_filters_tcp_proxy_route_config:

Route Configuration
===================

* TCP proxy :ref:`architecture overview <arch_overview_tcp_proxy>`.
* TCP proxy :ref:`filter <config_network_filters_tcp_proxy>`.

.. code-block:: json
{
"routes": []
}
:ref:`routes <config_network_filters_tcp_proxy_route>`
*(required, array)* An array of route entries that make up the route table.

.. toctree::
:hidden:

tcp_proxy_filter_route
19 changes: 19 additions & 0 deletions include/envoy/network/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,25 @@ class Connection : public Event::DeferredDeletable, public FilterManager {
*/
virtual const std::string& remoteAddress() PURE;

/**
* @return The port number used by the remote client.
*/
virtual uint32_t remotePort() PURE;

/**
* @return The address the remote client is trying to connect to.
* It can be different from the proxy address if the downstream connection
* has been redirected or the proxy is operating in transparent mode.
*/
virtual const std::string destinationAddress() PURE;

/**
* @return The port number the remote client is trying to connect to.
* It can be different from the port the listener is listening on if the connection
* has been redirected or the proxy is operating in transparent mode.
*/
virtual uint32_t destinationPort() PURE;

/**
* Set the buffer stats to update when the connection's read/write buffers change. Note that
* for performance reasons these stats are eventually consistent and may not always accurately
Expand Down
90 changes: 84 additions & 6 deletions source/common/filter/tcp_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,84 @@
#include "envoy/upstream/upstream.h"

#include "common/common/assert.h"
#include "common/common/empty_string.h"
#include "common/json/json_loader.h"

namespace Filter {

TcpProxyConfig::Route::Route(const Json::Object& config) {
if (config.hasObject("cluster")) {
cluster_name_ = config.getString("cluster");
} else {
throw EnvoyException(fmt::format("tcp proxy: route without cluster"));
}

if (config.hasObject("source_ip_list")) {
source_ips_ = Network::IpList(config.getStringArray("source_ip_list"));
}

if (config.hasObject("source_ports")) {
Network::Utility::parsePortRangeList(config.getString("source_ports"), source_port_ranges_);
}

if (config.hasObject("destination_ip_list")) {
destination_ips_ = Network::IpList(config.getStringArray("destination_ip_list"));
}

if (config.hasObject("destination_ports")) {
Network::Utility::parsePortRangeList(config.getString("destination_ports"),
destination_port_ranges_);
}
}

TcpProxyConfig::TcpProxyConfig(const Json::Object& config,
Upstream::ClusterManager& cluster_manager, Stats::Store& stats_store)
: cluster_name_(config.getString("cluster")),
stats_(generateStats(config.getString("stat_prefix"), stats_store)) {
if (!cluster_manager.get(cluster_name_)) {
throw EnvoyException(fmt::format("tcp proxy: unknown cluster '{}'", cluster_name_));
: stats_(generateStats(config.getString("stat_prefix"), stats_store)) {
if (!config.hasObject("route_config")) {
throw EnvoyException(fmt::format("tcp proxy: missing route config"));
}

for (const Json::ObjectPtr& route_desc :
config.getObject("route_config")->getObjectArray("routes")) {
routes_.emplace_back(Route(*route_desc));

if (!cluster_manager.get(route_desc->getString("cluster"))) {
throw EnvoyException(fmt::format("tcp proxy: unknown cluster '{}' in TCP route",
route_desc->getString("cluster")));
}
}
}

const std::string& TcpProxyConfig::getClusterForConnection(Network::Connection& connection) {
for (const TcpProxyConfig::Route& route : routes_) {
if (!route.source_port_ranges_.empty() &&
!Network::Utility::portInRangeList(connection.remotePort(), route.source_port_ranges_)) {
continue; // no match, try next route
}

if (!route.source_ips_.empty() && !route.source_ips_.contains(connection.remoteAddress())) {
continue; // no match, try next route
}

if (!route.destination_port_ranges_.empty() &&
!Network::Utility::portInRangeList(connection.destinationPort(),
route.destination_port_ranges_)) {
continue; // no match, try next route
}

if (!route.destination_ips_.empty() &&
!route.destination_ips_.contains(connection.destinationAddress())) {
continue; // no match, try next route
}

// if we made it past all checks, the route matches
return route.cluster_name_;
}

// no match, no more routes to try
return EMPTY_STRING;
}

TcpProxy::TcpProxy(TcpProxyConfigPtr config, Upstream::ClusterManager& cluster_manager)
: config_(config), cluster_manager_(cluster_manager), downstream_callbacks_(*this),
upstream_callbacks_(new UpstreamCallbacks(*this)) {}
Expand Down Expand Up @@ -56,14 +121,27 @@ void TcpProxy::initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callb
}

Network::FilterStatus TcpProxy::initializeUpstreamConnection() {
Upstream::ClusterInfoPtr cluster = cluster_manager_.get(config_->clusterName());
const std::string& destination_cluster =
config_->getClusterForConnection(read_callbacks_->connection());
conn_log_debug("Connection from {}", read_callbacks_->connection(), destination_cluster);

Upstream::ClusterInfoPtr cluster = cluster_manager_.get(destination_cluster);
if (cluster) {
conn_log_debug("Connection cluster with name {} found", read_callbacks_->connection(),
destination_cluster);
} else {
conn_log_debug("Connection cluster with name {} NOT FOUND", read_callbacks_->connection(),
destination_cluster);
return Network::FilterStatus::StopIteration;
}

if (!cluster->resourceManager(Upstream::ResourcePriority::Default).connections().canCreate()) {
cluster->stats().upstream_cx_overflow_.inc();
read_callbacks_->connection().close(Network::ConnectionCloseType::NoFlush);
return Network::FilterStatus::StopIteration;
}
Upstream::Host::CreateConnectionData conn_info =
cluster_manager_.tcpConnForCluster(config_->clusterName());
cluster_manager_.tcpConnForCluster(destination_cluster);

upstream_connection_ = std::move(conn_info.connection_);
read_callbacks_->upstreamHost(conn_info.host_description_);
Expand Down
16 changes: 14 additions & 2 deletions source/common/filter/tcp_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "common/common/logger.h"
#include "common/json/json_loader.h"
#include "common/network/filter_impl.h"
#include "common/network/utility.h"

namespace Filter {

Expand Down Expand Up @@ -38,13 +39,24 @@ class TcpProxyConfig {
TcpProxyConfig(const Json::Object& config, Upstream::ClusterManager& cluster_manager,
Stats::Store& stats_store);

const std::string& clusterName() { return cluster_name_; }
const std::string& getClusterForConnection(Network::Connection& connection);

const TcpProxyStats& stats() { return stats_; }

private:
struct Route {
Route(const Json::Object& config);

Network::IpList source_ips_;
Network::PortRangeList source_port_ranges_;
Network::IpList destination_ips_;
Network::PortRangeList destination_port_ranges_;
std::string cluster_name_;
};

static TcpProxyStats generateStats(const std::string& name, Stats::Store& store);

std::string cluster_name_;
std::list<Route> routes_;
const TcpProxyStats stats_;
};

Expand Down
42 changes: 35 additions & 7 deletions source/common/network/connection_impl.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "connection_impl.h"
#include "utility.h"

#include "envoy/event/timer.h"
#include "envoy/common/exception.h"
Expand Down Expand Up @@ -33,9 +34,9 @@ void ConnectionImplUtility::updateBufferStats(uint64_t delta, uint64_t new_total
std::atomic<uint64_t> ConnectionImpl::next_global_id_;

ConnectionImpl::ConnectionImpl(Event::DispatcherImpl& dispatcher, int fd,
const std::string& remote_address)
: filter_manager_(*this, *this), remote_address_(remote_address), dispatcher_(dispatcher),
fd_(fd), id_(++next_global_id_) {
const std::string& remote_address, uint32_t remote_port)
: filter_manager_(*this, *this), remote_address_(remote_address), remote_port_(remote_port),
dispatcher_(dispatcher), fd_(fd), id_(++next_global_id_) {

// Treat the lack of a valid fd (which in practice only happens if we run out of FDs) as an OOM
// condition and just crash.
Expand Down Expand Up @@ -395,9 +396,35 @@ void ConnectionImpl::updateWriteBufferStats(uint64_t num_written, uint64_t new_s
buffer_stats_->write_current_);
}

const std::string Network::ConnectionImpl::destinationAddress() {
if (fd_ != -1) {
sockaddr_storage orig_dst_addr;
memset(&orig_dst_addr, 0, sizeof(orig_dst_addr));
bool success = Utility::getOriginalDst(fd_, &orig_dst_addr);
if (success) {
return Utility::getAddressName(reinterpret_cast<struct sockaddr_in*>(&orig_dst_addr));
}
}

return EMPTY_STRING;
}

uint32_t Network::ConnectionImpl::destinationPort() {
if (fd_ != -1) {
sockaddr_storage orig_dst_addr;
memset(&orig_dst_addr, 0, sizeof(orig_dst_addr));
bool success = Utility::getOriginalDst(fd_, &orig_dst_addr);
if (success) {
return Utility::getAddressPort(reinterpret_cast<struct sockaddr_in*>(&orig_dst_addr));
}
}

return 0;
}

ClientConnectionImpl::ClientConnectionImpl(Event::DispatcherImpl& dispatcher, int fd,
const std::string& url)
: ConnectionImpl(dispatcher, fd, url) {}
const std::string& url, uint32_t port)
: ConnectionImpl(dispatcher, fd, url, port) {}

Network::ClientConnectionPtr ClientConnectionImpl::create(Event::DispatcherImpl& dispatcher,
const std::string& url) {
Expand All @@ -412,7 +439,8 @@ Network::ClientConnectionPtr ClientConnectionImpl::create(Event::DispatcherImpl&

TcpClientConnectionImpl::TcpClientConnectionImpl(Event::DispatcherImpl& dispatcher,
const std::string& url)
: ClientConnectionImpl(dispatcher, socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0), url) {}
: ClientConnectionImpl(dispatcher, socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0), url,
Network::Utility::portFromUrl(url)) {}

void TcpClientConnectionImpl::connect() {
AddrInfoPtr addr_info = Utility::resolveTCP(Utility::hostFromUrl(remote_address_),
Expand All @@ -422,7 +450,7 @@ void TcpClientConnectionImpl::connect() {

UdsClientConnectionImpl::UdsClientConnectionImpl(Event::DispatcherImpl& dispatcher,
const std::string& url)
: ClientConnectionImpl(dispatcher, socket(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK, 0), url) {}
: ClientConnectionImpl(dispatcher, socket(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK, 0), url, 0) {}

void UdsClientConnectionImpl::connect() {
sockaddr_un addr = Utility::resolveUnixDomainSocket(Utility::pathFromUrl(remote_address_));
Expand Down
10 changes: 8 additions & 2 deletions source/common/network/connection_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ class ConnectionImpl : public virtual Connection,
public BufferSource,
protected Logger::Loggable<Logger::Id::connection> {
public:
ConnectionImpl(Event::DispatcherImpl& dispatcher, int fd, const std::string& remote_address);
ConnectionImpl(Event::DispatcherImpl& dispatcher, int fd, const std::string& remote_address,
uint32_t remote_port);
~ConnectionImpl();

// Network::FilterManager
Expand All @@ -56,6 +57,9 @@ class ConnectionImpl : public virtual Connection,
void readDisable(bool disable) override;
bool readEnabled() override;
const std::string& remoteAddress() override { return remote_address_; }
uint32_t remotePort() override { return remote_port_; };
const std::string destinationAddress() override;
uint32_t destinationPort() override;
void setBufferStats(const BufferStats& stats) override;
Ssl::Connection* ssl() override { return nullptr; }
State state() override;
Expand All @@ -79,6 +83,7 @@ class ConnectionImpl : public virtual Connection,

FilterManagerImpl filter_manager_;
const std::string remote_address_;
uint32_t remote_port_;
Buffer::OwnedImpl read_buffer_;
Buffer::OwnedImpl write_buffer_;

Expand Down Expand Up @@ -122,7 +127,8 @@ class ConnectionImpl : public virtual Connection,
*/
class ClientConnectionImpl : public ConnectionImpl, virtual public ClientConnection {
public:
ClientConnectionImpl(Event::DispatcherImpl& dispatcher, int fd, const std::string& url);
ClientConnectionImpl(Event::DispatcherImpl& dispatcher, int fd, const std::string& url,
uint32_t port);

static Network::ClientConnectionPtr create(Event::DispatcherImpl& dispatcher,
const std::string& url);
Expand Down
14 changes: 8 additions & 6 deletions source/common/network/listener_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,25 +77,27 @@ void ListenerImpl::newConnection(int fd, sockaddr* addr) {
if (use_proxy_proto_) {
proxy_protocol_.newConnection(dispatcher_, fd, *this);
} else {
newConnection(fd, getAddressName(addr));
newConnection(fd, getAddressName(addr), getAddressPort(addr));
}
}

void ListenerImpl::newConnection(int fd, const std::string& remote_address) {
ConnectionPtr new_connection(new ConnectionImpl(dispatcher_, fd, remote_address));
void ListenerImpl::newConnection(int fd, const std::string& remote_address, uint32_t remote_port) {
ConnectionPtr new_connection(new ConnectionImpl(dispatcher_, fd, remote_address, remote_port));
cb_.onNewConnection(std::move(new_connection));
}

void SslListenerImpl::newConnection(int fd, sockaddr* addr) {
if (use_proxy_proto_) {
proxy_protocol_.newConnection(dispatcher_, fd, *this);
} else {
newConnection(fd, getAddressName(addr));
newConnection(fd, getAddressName(addr), getAddressPort(addr));
}
}

void SslListenerImpl::newConnection(int fd, const std::string& remote_address) {
ConnectionPtr new_connection(new Ssl::ConnectionImpl(dispatcher_, fd, remote_address, ssl_ctx_,
void SslListenerImpl::newConnection(int fd, const std::string& remote_address,
uint32_t remote_port) {
ConnectionPtr new_connection(new Ssl::ConnectionImpl(dispatcher_, fd, remote_address, remote_port,
ssl_ctx_,
Ssl::ConnectionImpl::InitialState::Server));
cb_.onNewConnection(std::move(new_connection));
}
Expand Down
Loading

0 comments on commit d833ec8

Please sign in to comment.