-
Notifications
You must be signed in to change notification settings - Fork 4.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add routing capabilities to tcp_proxy #377
Changes from 24 commits
09dd326
ed44be2
60e72a6
2ff9595
14eaed4
f655994
8641d2a
aa20a22
fa5bbd9
fa5aa3c
987bc7a
d1c75f6
7315ce3
8e746bf
5e2c2b1
becb250
2729237
d4b80ac
d833ec8
d877b8a
0b1cc69
91bd2a6
8b53016
5e6fbc4
8bc1d82
2dffce8
6eecce2
435ac47
5e46ef9
1548bce
87d9a90
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -115,6 +115,13 @@ class Connection : public Event::DeferredDeletable, public FilterManager { | |
*/ | ||
virtual const std::string& remoteAddress() PURE; | ||
|
||
/** | ||
* @return The address the remote client is trying to connect to. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This comment is still confusing from an interface perspective and I think should read (this would also eventually allow us to use this for logging client connection outbound addresses for example): "@return the local address of the connection. For client connections, this is the origin address. For server connections, this is the local destination address. For server connections it can be different from the proxy address if the downstream connection has been redirected or the proxy is operating in transparent mode. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok |
||
* It can be different from the proxy address if the downstream connection | ||
* has been redirected or the proxy is operating in transparent mode. | ||
*/ | ||
virtual const std::string destinationAddress() PURE; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In thinking about this more, this is still confusing as this is a base class for both client and server connections. Here is what I would do which shouldn't be too hard: rename this function to localAddress(). For server connections, local address would be the incoming address, and for client connections local address would be the origin address. I think this makes sense. I will comment on some implementation stuff below. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, that makes sense. The only issue is that the name becomes inaccurate when the connection is intercepted. It's probably ok since the comment above mentions that explicitly. |
||
|
||
/** | ||
* Set the buffer stats to update when the connection's read/write buffers change. Note that | ||
* for performance reasons these stats are eventually consistent and may not always accurately | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,20 +9,84 @@ | |
|
||
#include "common/common/assert.h" | ||
#include "common/json/config_schemas.h" | ||
#include "common/common/empty_string.h" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: alpha order |
||
#include "common/json/json_loader.h" | ||
|
||
namespace Filter { | ||
|
||
TcpProxyConfig::Route::Route(const Json::Object& config) { | ||
if (config.hasObject("cluster")) { | ||
cluster_name_ = config.getString("cluster"); | ||
} else { | ||
throw EnvoyException("tcp proxy: route without cluster"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With schema support now, can this actually happen? I don't think it can, so should probably delete. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. right, will remove |
||
} | ||
|
||
if (config.hasObject("source_ip_list")) { | ||
source_ips_ = Network::IpList(config.getStringArray("source_ip_list")); | ||
} | ||
|
||
if (config.hasObject("source_ports")) { | ||
Network::Utility::parsePortRangeList(config.getString("source_ports"), source_port_ranges_); | ||
} | ||
|
||
if (config.hasObject("destination_ip_list")) { | ||
destination_ips_ = Network::IpList(config.getStringArray("destination_ip_list")); | ||
} | ||
|
||
if (config.hasObject("destination_ports")) { | ||
Network::Utility::parsePortRangeList(config.getString("destination_ports"), | ||
destination_port_ranges_); | ||
} | ||
} | ||
|
||
TcpProxyConfig::TcpProxyConfig(const Json::Object& config, | ||
Upstream::ClusterManager& cluster_manager, Stats::Store& stats_store) | ||
: cluster_name_(config.getString("cluster")), | ||
stats_(generateStats(config.getString("stat_prefix"), stats_store)) { | ||
|
||
: stats_(generateStats(config.getString("stat_prefix"), stats_store)) { | ||
config.validateSchema(Json::Schema::TCP_PROXY_NETWORK_FILTER_SCHEMA); | ||
|
||
if (!cluster_manager.get(cluster_name_)) { | ||
throw EnvoyException(fmt::format("tcp proxy: unknown cluster '{}'", cluster_name_)); | ||
for (const Json::ObjectPtr& route_desc : | ||
config.getObject("route_config")->getObjectArray("routes")) { | ||
routes_.emplace_back(Route(*route_desc)); | ||
|
||
if (!cluster_manager.get(route_desc->getString("cluster"))) { | ||
throw EnvoyException(fmt::format("tcp proxy: unknown cluster '{}' in TCP route", | ||
route_desc->getString("cluster"))); | ||
} | ||
} | ||
} | ||
|
||
const std::string& TcpProxyConfig::getRouteFromEntries(Network::Connection& connection) { | ||
for (const TcpProxyConfig::Route& route : routes_) { | ||
if (!route.source_port_ranges_.empty() && | ||
!Network::Utility::portInRangeList( | ||
Network::Utility::portFromUrl(connection.remoteAddress()), route.source_port_ranges_)) { | ||
continue; // no match, try next route | ||
} | ||
|
||
if (!route.source_ips_.empty() && | ||
!route.source_ips_.contains(Network::Utility::hostFromUrl(connection.remoteAddress()))) { | ||
continue; // no match, try next route | ||
} | ||
|
||
if (!route.destination_port_ranges_.empty() && | ||
!Network::Utility::portInRangeList( | ||
Network::Utility::portFromUrl(connection.destinationAddress()), | ||
route.destination_port_ranges_)) { | ||
continue; // no match, try next route | ||
} | ||
|
||
if (!route.destination_ips_.empty() && | ||
!route.destination_ips_.contains( | ||
Network::Utility::hostFromUrl(connection.destinationAddress()))) { | ||
continue; // no match, try next route | ||
} | ||
|
||
// if we made it past all checks, the route matches | ||
return route.cluster_name_; | ||
} | ||
|
||
// no match, no more routes to try | ||
return EMPTY_STRING; | ||
} | ||
|
||
TcpProxy::TcpProxy(TcpProxyConfigPtr config, Upstream::ClusterManager& cluster_manager) | ||
|
@@ -52,6 +116,7 @@ TcpProxyStats TcpProxyConfig::generateStats(const std::string& name, Stats::Stor | |
void TcpProxy::initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) { | ||
read_callbacks_ = &callbacks; | ||
conn_log_info("new tcp proxy session", read_callbacks_->connection()); | ||
config_->stats().downstream_cx_total_.inc(); | ||
read_callbacks_->connection().addConnectionCallbacks(downstream_callbacks_); | ||
read_callbacks_->connection().setBufferStats({config_->stats().downstream_cx_rx_bytes_total_, | ||
config_->stats().downstream_cx_rx_bytes_buffered_, | ||
|
@@ -60,14 +125,25 @@ void TcpProxy::initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callb | |
} | ||
|
||
Network::FilterStatus TcpProxy::initializeUpstreamConnection() { | ||
Upstream::ClusterInfoPtr cluster = cluster_manager_.get(config_->clusterName()); | ||
const std::string& cluster_name = config_->getRouteFromEntries(read_callbacks_->connection()); | ||
|
||
Upstream::ClusterInfoPtr cluster = cluster_manager_.get(cluster_name); | ||
|
||
if (cluster) { | ||
conn_log_debug("Creating connection to cluster {}", read_callbacks_->connection(), | ||
cluster_name); | ||
} else { | ||
config_->stats().downstream_cx_no_route_.inc(); | ||
read_callbacks_->connection().close(Network::ConnectionCloseType::NoFlush); | ||
return Network::FilterStatus::StopIteration; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. would be good to have a stat on this scenario There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Currently this can't happen, because you verify that all the clusters exist. If we eventually want to support CDS/RDS like constructs for tcp_proxy, we will need to deal with this, but for now I would not have this logic at all and just crash if cluster is nullptr. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The clusters in the config always exist, but the downstream connection parameters may not match any of the programmed routes (if there's no default), in which case getClusterForConnection() will return "". I think Roman's concern is valid and we should close the connection in that case. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am also adding a counter for total number of connections handled by the filter and number of connections closed due to no route match |
||
} | ||
|
||
if (!cluster->resourceManager(Upstream::ResourcePriority::Default).connections().canCreate()) { | ||
cluster->stats().upstream_cx_overflow_.inc(); | ||
read_callbacks_->connection().close(Network::ConnectionCloseType::NoFlush); | ||
return Network::FilterStatus::StopIteration; | ||
} | ||
Upstream::Host::CreateConnectionData conn_info = | ||
cluster_manager_.tcpConnForCluster(config_->clusterName()); | ||
Upstream::Host::CreateConnectionData conn_info = cluster_manager_.tcpConnForCluster(cluster_name); | ||
|
||
upstream_connection_ = std::move(conn_info.connection_); | ||
read_callbacks_->upstreamHost(conn_info.host_description_); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,6 +9,7 @@ | |
#include "common/common/logger.h" | ||
#include "common/json/json_loader.h" | ||
#include "common/network/filter_impl.h" | ||
#include "common/network/utility.h" | ||
|
||
namespace Filter { | ||
|
||
|
@@ -20,7 +21,9 @@ namespace Filter { | |
COUNTER(downstream_cx_rx_bytes_total) \ | ||
GAUGE (downstream_cx_rx_bytes_buffered) \ | ||
COUNTER(downstream_cx_tx_bytes_total) \ | ||
GAUGE (downstream_cx_tx_bytes_buffered) | ||
GAUGE (downstream_cx_tx_bytes_buffered) \ | ||
COUNTER(downstream_cx_total) \ | ||
COUNTER(downstream_cx_no_route) | ||
// clang-format on | ||
|
||
/** | ||
|
@@ -38,13 +41,32 @@ class TcpProxyConfig { | |
TcpProxyConfig(const Json::Object& config, Upstream::ClusterManager& cluster_manager, | ||
Stats::Store& stats_store); | ||
|
||
const std::string& clusterName() { return cluster_name_; } | ||
/** | ||
* Find out which cluster an upstream connection should be opened to based on the | ||
* parameters of a downstream connection. | ||
* @param connection supplies the parameters of the downstream connection for | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. upstream connection may be, can you fix the comment There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The comment is actually correct, but maybe not clear. I have clarified. |
||
* which the proxy needs to open the corresponding upstream. | ||
* @return the cluster name to be used for the upstream connection. | ||
If no route applies, returns the empty string. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: missing * There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok |
||
*/ | ||
const std::string& getRouteFromEntries(Network::Connection& connection); | ||
|
||
const TcpProxyStats& stats() { return stats_; } | ||
|
||
private: | ||
struct Route { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Move out of class, to make it extensible? similar to weighted clusters use case? [it can be done later as well. But a TODO might help keep track of this]. @mattklein123 WDYT? Weighted clusters for tcp could follow a similar implementation like HTTP. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @rshriram I agree that ultimately we will want to make this more complicated, and probably use RDS, etc., but for now in the interest of time I think it's OK to keep this here since it's so simple. We can refactor it out in a future change. |
||
Route(const Json::Object& config); | ||
|
||
Network::IpList source_ips_; | ||
Network::PortRangeList source_port_ranges_; | ||
Network::IpList destination_ips_; | ||
Network::PortRangeList destination_port_ranges_; | ||
std::string cluster_name_; | ||
}; | ||
|
||
static TcpProxyStats generateStats(const std::string& name, Stats::Store& store); | ||
|
||
std::string cluster_name_; | ||
std::vector<Route> routes_; | ||
const TcpProxyStats stats_; | ||
}; | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -128,11 +128,44 @@ const std::string Json::Schema::REDIS_PROXY_NETWORK_FILTER_SCHEMA(R"EOF( | |
const std::string Json::Schema::TCP_PROXY_NETWORK_FILTER_SCHEMA(R"EOF( | ||
{ | ||
"$schema": "http://json-schema.org/schema#", | ||
"properties":{ | ||
"properties": { | ||
"stat_prefix" : {"type" : "string"}, | ||
"cluster" : {"type" : "string"} | ||
"route_config" : { | ||
"type" : "object", | ||
"properties": { | ||
"routes" : { | ||
"type" : "array", | ||
"items" : { | ||
"cluster": { | ||
"type" : "string" | ||
}, | ||
"source_ip_list" : { | ||
"type" : "array", | ||
"items" : { | ||
"type" : "string" | ||
} | ||
}, | ||
"source__ports": { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. source_ports There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thanks. Besides the typo there was an error in the schema. I fixed it and verified that without the fix the typo would have been caught by a test. |
||
"type" : "string" | ||
}, | ||
"destination_ip_list" : { | ||
"type" : "array", | ||
"items" : { | ||
"type" : "string" | ||
} | ||
}, | ||
"destination_ports": { | ||
"type" : "string" | ||
} | ||
}, | ||
"required": ["cluster"], | ||
"additionalProperties": false | ||
} | ||
}, | ||
"additionalProperties": false | ||
} | ||
}, | ||
"required": ["stat_prefix", "cluster"], | ||
"required": ["stat_prefix", "route_config"], | ||
"additionalProperties": false | ||
} | ||
)EOF"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'd probably put here those optional config settings here as well, so one checking this template file can quickly get a sense of all options.
will leave up to you.
These configs are loaded as part of Envoy tests as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I had the impression that these templates were used for generating actual configs. If they are just loaded during Envoy tests I will add them so that the parsing logic gets exercised.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't find a good real-world example for this because in configgen.py the tcp_proxy instances and the clusters are coupled (since there used to be a 1:1 mapping). Let me think a bit more about this while you guys review the new version.