-
Notifications
You must be signed in to change notification settings - Fork 4.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add routing capabilities to tcp_proxy #377
Changes from 27 commits
09dd326
ed44be2
60e72a6
2ff9595
14eaed4
f655994
8641d2a
aa20a22
fa5bbd9
fa5aa3c
987bc7a
d1c75f6
7315ce3
8e746bf
5e2c2b1
becb250
2729237
d4b80ac
d833ec8
d877b8a
0b1cc69
91bd2a6
8b53016
5e6fbc4
8bc1d82
2dffce8
6eecce2
435ac47
5e46ef9
1548bce
87d9a90
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -111,10 +111,19 @@ class Connection : public Event::DeferredDeletable, public FilterManager { | |
virtual bool readEnabled() PURE; | ||
|
||
/** | ||
* @return The address of the remote client | ||
* @return The address of the remote client. | ||
* For TCP connections, it is in the form tcp://a.b.c.d:port | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it might not be of the of the form tcp:// if it's a UDS connection. I would just delete that part of the comment for now. I opened #390 to clean this up and will get to this soon. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
*/ | ||
virtual const std::string& remoteAddress() PURE; | ||
|
||
/** | ||
* @return the local address of the connection. For client connections, this is the origin | ||
* address. For server connections, this is the local destination address. For server connections | ||
* it can be different from the proxy address if the downstream connection has been redirected or | ||
* the proxy is operating in transparent mode. | ||
*/ | ||
virtual const std::string& localAddress() PURE; | ||
|
||
/** | ||
* Set the buffer stats to update when the connection's read/write buffers change. Note that | ||
* for performance reasons these stats are eventually consistent and may not always accurately | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,22 +9,89 @@ | |
|
||
#include "common/common/assert.h" | ||
#include "common/json/config_schemas.h" | ||
#include "common/common/empty_string.h" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: alpha order |
||
#include "common/json/json_loader.h" | ||
|
||
namespace Filter { | ||
|
||
TcpProxyConfig::Route::Route(const Json::Object& config) { | ||
cluster_name_ = config.getString("cluster"); | ||
|
||
if (config.hasObject("source_ip_list")) { | ||
source_ips_ = Network::IpList(config.getStringArray("source_ip_list")); | ||
} | ||
|
||
if (config.hasObject("source_ports")) { | ||
Network::Utility::parsePortRangeList(config.getString("source_ports"), source_port_ranges_); | ||
} | ||
|
||
if (config.hasObject("destination_ip_list")) { | ||
destination_ips_ = Network::IpList(config.getStringArray("destination_ip_list")); | ||
} | ||
|
||
if (config.hasObject("destination_ports")) { | ||
Network::Utility::parsePortRangeList(config.getString("destination_ports"), | ||
destination_port_ranges_); | ||
} | ||
} | ||
|
||
TcpProxyConfig::TcpProxyConfig(const Json::Object& config, | ||
Upstream::ClusterManager& cluster_manager, Stats::Store& stats_store) | ||
: cluster_name_(config.getString("cluster")), | ||
stats_(generateStats(config.getString("stat_prefix"), stats_store)) { | ||
|
||
: stats_(generateStats(config.getString("stat_prefix"), stats_store)) { | ||
config.validateSchema(Json::Schema::TCP_PROXY_NETWORK_FILTER_SCHEMA); | ||
|
||
if (!cluster_manager.get(cluster_name_)) { | ||
throw EnvoyException(fmt::format("tcp proxy: unknown cluster '{}'", cluster_name_)); | ||
for (const Json::ObjectPtr& route_desc : | ||
config.getObject("route_config")->getObjectArray("routes")) { | ||
routes_.emplace_back(Route(*route_desc)); | ||
|
||
if (!cluster_manager.get(route_desc->getString("cluster"))) { | ||
throw EnvoyException(fmt::format("tcp proxy: unknown cluster '{}' in TCP route", | ||
route_desc->getString("cluster"))); | ||
} | ||
} | ||
} | ||
|
||
const std::string& TcpProxyConfig::getRouteFromEntries(Network::Connection& connection) { | ||
for (const TcpProxyConfig::Route& route : routes_) { | ||
if (!route.source_port_ranges_.empty() && | ||
!Network::Utility::portInRangeList( | ||
Network::Utility::portFromUrl(connection.remoteAddress()), route.source_port_ranges_)) { | ||
continue; // no match, try next route | ||
} | ||
|
||
if (!route.source_ips_.empty() && | ||
!route.source_ips_.contains(Network::Utility::hostFromUrl(connection.remoteAddress()))) { | ||
continue; // no match, try next route | ||
} | ||
|
||
// If the route needs to match on destination address and port but they are not available | ||
// (localAddress is empty), we skip it. The connection has a chance to match a different | ||
// route that does not depend on destination address and port. | ||
if ((!route.destination_port_ranges_.empty() || !route.destination_ips_.empty()) && | ||
connection.localAddress().empty()) { | ||
continue; | ||
} | ||
|
||
if (!route.destination_port_ranges_.empty() && | ||
!Network::Utility::portInRangeList(Network::Utility::portFromUrl(connection.localAddress()), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if localAddress() is empty, this will throw an exception, so I think for now need to check if it's not empty. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok |
||
route.destination_port_ranges_)) { | ||
continue; // no match, try next route | ||
} | ||
|
||
if (!route.destination_ips_.empty() && | ||
!route.destination_ips_.contains( | ||
Network::Utility::hostFromUrl(connection.localAddress()))) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if localAddress() is empty, this will throw an exception, so I think for now need to check if it's not empty. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok |
||
continue; // no match, try next route | ||
} | ||
|
||
// if we made it past all checks, the route matches | ||
return route.cluster_name_; | ||
} | ||
|
||
// no match, no more routes to try | ||
return EMPTY_STRING; | ||
} | ||
|
||
TcpProxy::TcpProxy(TcpProxyConfigPtr config, Upstream::ClusterManager& cluster_manager) | ||
: config_(config), cluster_manager_(cluster_manager), downstream_callbacks_(*this), | ||
upstream_callbacks_(new UpstreamCallbacks(*this)) {} | ||
|
@@ -52,6 +119,7 @@ TcpProxyStats TcpProxyConfig::generateStats(const std::string& name, Stats::Stor | |
void TcpProxy::initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) { | ||
read_callbacks_ = &callbacks; | ||
conn_log_info("new tcp proxy session", read_callbacks_->connection()); | ||
config_->stats().downstream_cx_total_.inc(); | ||
read_callbacks_->connection().addConnectionCallbacks(downstream_callbacks_); | ||
read_callbacks_->connection().setBufferStats({config_->stats().downstream_cx_rx_bytes_total_, | ||
config_->stats().downstream_cx_rx_bytes_buffered_, | ||
|
@@ -60,14 +128,25 @@ void TcpProxy::initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callb | |
} | ||
|
||
Network::FilterStatus TcpProxy::initializeUpstreamConnection() { | ||
Upstream::ClusterInfoPtr cluster = cluster_manager_.get(config_->clusterName()); | ||
const std::string& cluster_name = config_->getRouteFromEntries(read_callbacks_->connection()); | ||
|
||
Upstream::ClusterInfoPtr cluster = cluster_manager_.get(cluster_name); | ||
|
||
if (cluster) { | ||
conn_log_debug("Creating connection to cluster {}", read_callbacks_->connection(), | ||
cluster_name); | ||
} else { | ||
config_->stats().downstream_cx_no_route_.inc(); | ||
read_callbacks_->connection().close(Network::ConnectionCloseType::NoFlush); | ||
return Network::FilterStatus::StopIteration; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. would be good to have a stat on this scenario There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Currently this can't happen, because you verify that all the clusters exist. If we eventually want to support CDS/RDS like constructs for tcp_proxy, we will need to deal with this, but for now I would not have this logic at all and just crash if cluster is nullptr. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The clusters in the config always exist, but the downstream connection parameters may not match any of the programmed routes (if there's no default), in which case getClusterForConnection() will return "". I think Roman's concern is valid and we should close the connection in that case. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am also adding a counter for total number of connections handled by the filter and number of connections closed due to no route match |
||
} | ||
|
||
if (!cluster->resourceManager(Upstream::ResourcePriority::Default).connections().canCreate()) { | ||
cluster->stats().upstream_cx_overflow_.inc(); | ||
read_callbacks_->connection().close(Network::ConnectionCloseType::NoFlush); | ||
return Network::FilterStatus::StopIteration; | ||
} | ||
Upstream::Host::CreateConnectionData conn_info = | ||
cluster_manager_.tcpConnForCluster(config_->clusterName()); | ||
Upstream::Host::CreateConnectionData conn_info = cluster_manager_.tcpConnForCluster(cluster_name); | ||
|
||
upstream_connection_ = std::move(conn_info.connection_); | ||
read_callbacks_->upstreamHost(conn_info.host_description_); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,6 +9,7 @@ | |
#include "common/common/logger.h" | ||
#include "common/json/json_loader.h" | ||
#include "common/network/filter_impl.h" | ||
#include "common/network/utility.h" | ||
|
||
namespace Filter { | ||
|
||
|
@@ -20,7 +21,9 @@ namespace Filter { | |
COUNTER(downstream_cx_rx_bytes_total) \ | ||
GAUGE (downstream_cx_rx_bytes_buffered) \ | ||
COUNTER(downstream_cx_tx_bytes_total) \ | ||
GAUGE (downstream_cx_tx_bytes_buffered) | ||
GAUGE (downstream_cx_tx_bytes_buffered) \ | ||
COUNTER(downstream_cx_total) \ | ||
COUNTER(downstream_cx_no_route) | ||
// clang-format on | ||
|
||
/** | ||
|
@@ -38,13 +41,32 @@ class TcpProxyConfig { | |
TcpProxyConfig(const Json::Object& config, Upstream::ClusterManager& cluster_manager, | ||
Stats::Store& stats_store); | ||
|
||
const std::string& clusterName() { return cluster_name_; } | ||
/** | ||
* Find out which cluster an upstream connection should be opened to based on the | ||
* parameters of a downstream connection. | ||
* @param connection supplies the parameters of the downstream connection for | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. upstream connection may be, can you fix the comment There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The comment is actually correct, but maybe not clear. I have clarified. |
||
* which the proxy needs to open the corresponding upstream. | ||
* @return the cluster name to be used for the upstream connection. | ||
* If no route applies, returns the empty string. | ||
*/ | ||
const std::string& getRouteFromEntries(Network::Connection& connection); | ||
|
||
const TcpProxyStats& stats() { return stats_; } | ||
|
||
private: | ||
struct Route { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Move out of class, to make it extensible? similar to weighted clusters use case? [it can be done later as well. But a TODO might help keep track of this]. @mattklein123 WDYT? Weighted clusters for tcp could follow a similar implementation like HTTP. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @rshriram I agree that ultimately we will want to make this more complicated, and probably use RDS, etc., but for now in the interest of time I think it's OK to keep this here since it's so simple. We can refactor it out in a future change. |
||
Route(const Json::Object& config); | ||
|
||
Network::IpList source_ips_; | ||
Network::PortRangeList source_port_ranges_; | ||
Network::IpList destination_ips_; | ||
Network::PortRangeList destination_port_ranges_; | ||
std::string cluster_name_; | ||
}; | ||
|
||
static TcpProxyStats generateStats(const std::string& name, Stats::Store& store); | ||
|
||
std::string cluster_name_; | ||
std::vector<Route> routes_; | ||
const TcpProxyStats stats_; | ||
}; | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'd probably put here those optional config settings here as well, so one checking this template file can quickly get a sense of all options.
will leave up to you.
These configs are loaded as part of Envoy tests as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I had the impression that these templates were used for generating actual configs. If they are just loaded during Envoy tests I will add them so that the parsing logic gets exercised.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't find a good real-world example for this because in configgen.py the tcp_proxy instances and the clusters are coupled (since there used to be a 1:1 mapping). Let me think a bit more about this while you guys review the new version.