Skip to content

Commit

Permalink
Listener: introduce filter chain context (envoyproxy#9205)
Browse files Browse the repository at this point in the history
Description: Introduce filter chain context.
Goal: Support the future work of adding filter chain without drain the whole listener, and deleting one filter chain by draining only the connection associated with the deleted filter chain.

The ListenerFactoryContext should cover FilterChainFactoryContext, and filter chain context should cover the life of all the associated connections referring to the filter chain.

In this PR the filter chain contexts are not yet destructed independently. I have follow up PRs to release the power of filter chain contexts.

Risk Level: LOW
Testing: unit test

Addressing envoyproxy#4540 3/N

Signed-off-by: Yuchen Dai <silentdai@gmail.com>

Signed-off-by: Prakhar Gautam <prakhag@gmail.com>
  • Loading branch information
lambdai authored and prakhag committed Jan 8, 2020
1 parent 6ccbabe commit dd739cd
Show file tree
Hide file tree
Showing 22 changed files with 443 additions and 85 deletions.
24 changes: 18 additions & 6 deletions include/envoy/server/filter_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ class FactoryContext : public virtual CommonFactoryContext {
virtual AccessLog::AccessLogManager& accessLogManager() PURE;

/**
* @return envoy::api::v2::core::TrafficDirection the direction of the traffic relative to the
* local proxy.
* @return envoy::config::core::v3alpha::TrafficDirection the direction of the traffic relative to
* the local proxy.
*/
virtual envoy::config::core::v3alpha::TrafficDirection direction() const PURE;

Expand Down Expand Up @@ -168,7 +168,7 @@ class FactoryContext : public virtual CommonFactoryContext {
virtual Stats::Scope& listenerScope() PURE;

/**
* @return const envoy::api::v2::core::Metadata& the config metadata associated with this
* @return const envoy::config::core::v3alpha::Metadata& the config metadata associated with this
* listener.
*/
virtual const envoy::config::core::v3alpha::Metadata& listenerMetadata() const PURE;
Expand Down Expand Up @@ -201,6 +201,17 @@ class FactoryContext : public virtual CommonFactoryContext {
virtual ProtobufMessage::ValidationVisitor& messageValidationVisitor() PURE;
};

/**
* An implementation of FactoryContext. The life time is no shorter than the created filter chains.
* The life time is no longer than the owning listener. It should be used to create
* NetworkFilterChain.
*/
class FilterChainFactoryContext : public virtual FactoryContext {};

/**
* An implementation of FactoryContext. The life time should cover the lifetime of the filter chains
* and connections. It can be used to create ListenerFilterChain.
*/
class ListenerFactoryContext : public virtual FactoryContext {
public:
/**
Expand Down Expand Up @@ -308,11 +319,12 @@ class NamedNetworkFilterConfigFactory : public ProtocolOptionsFactory {
* produce a factory with the provided parameters, it should throw an EnvoyException. The returned
* callback should always be initialized.
* @param config supplies the general json configuration for the filter
* @param context supplies the filter's context.
* @param filter_chain_factory_context supplies the filter's context.
* @return Network::FilterFactoryCb the factory creation function.
*/
virtual Network::FilterFactoryCb createFilterFactoryFromProto(const Protobuf::Message& config,
FactoryContext& context) PURE;
virtual Network::FilterFactoryCb
createFilterFactoryFromProto(const Protobuf::Message& config,
FactoryContext& filter_chain_factory_context) PURE;

std::string category() const override { return "filters.network"; }

Expand Down
2 changes: 1 addition & 1 deletion include/envoy/server/listener_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class ListenerComponentFactory {
*/
virtual std::vector<Network::FilterFactoryCb> createNetworkFilterFactoryList(
const Protobuf::RepeatedPtrField<envoy::config::listener::v3alpha::Filter>& filters,
Configuration::FactoryContext& context) PURE;
Server::Configuration::FilterChainFactoryContext& filter_chain_factory_context) PURE;

/**
* Creates a list of listener filter factories.
Expand Down
15 changes: 14 additions & 1 deletion source/server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ envoy_cc_library(
envoy_cc_library(
name = "drain_manager_lib",
srcs = ["drain_manager_impl.cc"],
hdrs = ["drain_manager_impl.h"],
hdrs = [
"drain_manager_impl.h",
],
deps = [
"//include/envoy/event:dispatcher_interface",
"//include/envoy/event:timer_interface",
Expand Down Expand Up @@ -340,10 +342,12 @@ envoy_cc_library(
srcs = ["filter_chain_manager_impl.cc"],
hdrs = ["filter_chain_manager_impl.h"],
deps = [
":filter_chain_factory_context_callback",
"//include/envoy/server:listener_manager_interface",
"//include/envoy/server:transport_socket_config_interface",
"//source/common/common:empty_string",
"//source/common/config:utility_lib",
"//source/common/init:manager_lib",
"//source/common/network:cidr_range_lib",
"//source/common/network:lc_trie_lib",
"//source/server:configuration_lib",
Expand Down Expand Up @@ -509,3 +513,12 @@ envoy_cc_library(
"@envoy_api//envoy/api/v2/listener:pkg_cc_proto",
],
)

envoy_cc_library(
name = "filter_chain_factory_context_callback",
hdrs = ["filter_chain_factory_context_callback.h"],
deps = [
"//include/envoy/server:filter_config_interface",
"@envoy_api//envoy/api/v2/listener:pkg_cc_proto",
],
)
5 changes: 3 additions & 2 deletions source/server/config_validation/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,9 @@ class ValidationInstance final : Logger::Loggable<Logger::Id::main>,
}
std::vector<Network::FilterFactoryCb> createNetworkFilterFactoryList(
const Protobuf::RepeatedPtrField<envoy::config::listener::v3alpha::Filter>& filters,
Configuration::FactoryContext& context) override {
return ProdListenerComponentFactory::createNetworkFilterFactoryList_(filters, context);
Server::Configuration::FilterChainFactoryContext& filter_chain_factory_context) override {
return ProdListenerComponentFactory::createNetworkFilterFactoryList_(
filters, filter_chain_factory_context);
}
std::vector<Network::ListenerFilterFactoryCb> createListenerFilterFactoryList(
const Protobuf::RepeatedPtrField<envoy::config::listener::v3alpha::ListenerFilter>& filters,
Expand Down
77 changes: 58 additions & 19 deletions source/server/connection_handler_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,21 @@ void ConnectionHandlerImpl::enableListeners() {

void ConnectionHandlerImpl::ActiveTcpListener::removeConnection(ActiveTcpConnection& connection) {
ENVOY_CONN_LOG(debug, "adding to cleanup list", *connection.connection_);
ActiveTcpConnectionPtr removed = connection.removeFromList(connections_);
ActiveConnections& active_connections = connection.active_connections_;
ActiveTcpConnectionPtr removed = connection.removeFromList(active_connections.connections_);
parent_.dispatcher_.deferredDelete(std::move(removed));
// Delete map entry only iff connections becomes empty.
if (active_connections.connections_.empty()) {
auto iter = connections_by_context_.find(&active_connections.filter_chain_);
ASSERT(iter != connections_by_context_.end());
// To cover the lifetime of every single connection, Connections need to be deferred deleted
// because the previously contained connection is deferred deleted.
parent_.dispatcher_.deferredDelete(std::move(iter->second));
// The erase will break the iteration over the connections_by_context_ during the deletion.
if (!is_deleting_) {
connections_by_context_.erase(iter);
}
}
}

ConnectionHandlerImpl::ActiveListenerImplBase::ActiveListenerImplBase(
Expand Down Expand Up @@ -116,6 +129,7 @@ ConnectionHandlerImpl::ActiveTcpListener::ActiveTcpListener(ConnectionHandlerImp
}

ConnectionHandlerImpl::ActiveTcpListener::~ActiveTcpListener() {
is_deleting_ = true;
config_.connectionBalancer().unregisterHandler(*this);

// Purge sockets that have not progressed to connections. This should only happen when
Expand All @@ -125,10 +139,13 @@ ConnectionHandlerImpl::ActiveTcpListener::~ActiveTcpListener() {
parent_.dispatcher_.deferredDelete(std::move(removed));
}

while (!connections_.empty()) {
connections_.front()->connection_->close(Network::ConnectionCloseType::NoFlush);
for (auto& chain_and_connections : connections_by_context_) {
ASSERT(chain_and_connections.second != nullptr);
auto& connections = chain_and_connections.second->connections_;
while (!connections.empty()) {
connections.front()->connection_->close(Network::ConnectionCloseType::NoFlush);
}
}

parent_.dispatcher_.clearDeferredDeleteList();

// By the time a listener is destroyed, in the common case, there should be no connections.
Expand Down Expand Up @@ -317,8 +334,9 @@ void ConnectionHandlerImpl::ActiveTcpListener::newConnection(
}

auto transport_socket = filter_chain->transportSocketFactory().createTransportSocket(nullptr);
auto& active_connections = getOrCreateActiveConnections(*filter_chain);
ActiveTcpConnectionPtr active_connection(new ActiveTcpConnection(
*this,
active_connections,
parent_.dispatcher_.createServerConnection(std::move(socket), std::move(transport_socket)),
parent_.dispatcher_.timeSource()));
active_connection->connection_->setBufferLimits(config_.perConnectionBufferLimitBytes());
Expand All @@ -334,8 +352,19 @@ void ConnectionHandlerImpl::ActiveTcpListener::newConnection(
if (active_connection->connection_->state() != Network::Connection::State::Closed) {
ENVOY_CONN_LOG(debug, "new connection", *active_connection->connection_);
active_connection->connection_->addConnectionCallbacks(*active_connection);
active_connection->moveIntoList(std::move(active_connection), connections_);
active_connection->moveIntoList(std::move(active_connection), active_connections.connections_);
}
// TODO(lambdai): defer delete active_connections when supporting per tag drain
}

ConnectionHandlerImpl::ActiveConnections&
ConnectionHandlerImpl::ActiveTcpListener::getOrCreateActiveConnections(
const Network::FilterChain& filter_chain) {
ActiveConnectionsPtr& connections = connections_by_context_[&filter_chain];
if (connections == nullptr) {
connections = std::make_unique<ConnectionHandlerImpl::ActiveConnections>(*this, filter_chain);
}
return *connections;
}

namespace {
Expand Down Expand Up @@ -374,37 +403,47 @@ void ConnectionHandlerImpl::ActiveTcpListener::post(Network::ConnectionSocketPtr
});
}

ConnectionHandlerImpl::ActiveConnections::ActiveConnections(
ConnectionHandlerImpl::ActiveTcpListener& listener, const Network::FilterChain& filter_chain)
: listener_(listener), filter_chain_(filter_chain) {}

ConnectionHandlerImpl::ActiveConnections::~ActiveConnections() {
// connections should be defer deleted already.
ASSERT(connections_.empty());
}

ConnectionHandlerImpl::ActiveTcpConnection::ActiveTcpConnection(
ActiveTcpListener& listener, Network::ConnectionPtr&& new_connection, TimeSource& time_source)
: listener_(listener), connection_(std::move(new_connection)),
ActiveConnections& active_connections, Network::ConnectionPtr&& new_connection,
TimeSource& time_source)
: active_connections_(active_connections), connection_(std::move(new_connection)),
conn_length_(new Stats::HistogramCompletableTimespanImpl(
listener_.stats_.downstream_cx_length_ms_, time_source)) {
active_connections_.listener_.stats_.downstream_cx_length_ms_, time_source)) {
// We just universally set no delay on connections. Theoretically we might at some point want
// to make this configurable.
connection_->noDelay(true);

listener_.stats_.downstream_cx_total_.inc();
listener_.stats_.downstream_cx_active_.inc();
listener_.per_worker_stats_.downstream_cx_total_.inc();
listener_.per_worker_stats_.downstream_cx_active_.inc();
active_connections_.listener_.stats_.downstream_cx_total_.inc();
active_connections_.listener_.stats_.downstream_cx_active_.inc();
active_connections_.listener_.per_worker_stats_.downstream_cx_total_.inc();
active_connections_.listener_.per_worker_stats_.downstream_cx_active_.inc();

// Active connections on the handler (not listener). The per listener connections have already
// been incremented at this point either via the connection balancer or in the socket accept
// path if there is no configured balancer.
++listener_.parent_.num_handler_connections_;
++active_connections_.listener_.parent_.num_handler_connections_;
}

ConnectionHandlerImpl::ActiveTcpConnection::~ActiveTcpConnection() {
listener_.stats_.downstream_cx_active_.dec();
listener_.stats_.downstream_cx_destroy_.inc();
listener_.per_worker_stats_.downstream_cx_active_.dec();
active_connections_.listener_.stats_.downstream_cx_active_.dec();
active_connections_.listener_.stats_.downstream_cx_destroy_.inc();
active_connections_.listener_.per_worker_stats_.downstream_cx_active_.dec();
conn_length_->complete();

// Active listener connections (not handler).
listener_.decNumConnections();
active_connections_.listener_.decNumConnections();

// Active handler connections (not listener).
listener_.parent_.decNumConnections();
active_connections_.listener_.parent_.decNumConnections();
}

ActiveUdpListener::ActiveUdpListener(Network::ConnectionHandler& parent,
Expand Down
30 changes: 25 additions & 5 deletions source/server/connection_handler_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ class ConnectionHandlerImpl : public Network::ConnectionHandler,
using ActiveTcpConnectionPtr = std::unique_ptr<ActiveTcpConnection>;
struct ActiveTcpSocket;
using ActiveTcpSocketPtr = std::unique_ptr<ActiveTcpSocket>;
class ActiveConnections;
using ActiveConnectionsPtr = std::unique_ptr<ActiveConnections>;

/**
* Wrapper for an active tcp listener owned by this handler.
Expand Down Expand Up @@ -136,16 +138,34 @@ class ConnectionHandlerImpl : public Network::ConnectionHandler,
*/
void newConnection(Network::ConnectionSocketPtr&& socket);

ActiveConnections& getOrCreateActiveConnections(const Network::FilterChain& filter_chain);

ConnectionHandlerImpl& parent_;
Network::ListenerPtr listener_;
const std::chrono::milliseconds listener_filters_timeout_;
const bool continue_on_listener_filters_timeout_;
std::list<ActiveTcpSocketPtr> sockets_;
std::list<ActiveTcpConnectionPtr> connections_;
std::unordered_map<const Network::FilterChain*, ActiveConnectionsPtr> connections_by_context_;

// The number of connections currently active on this listener. This is typically used for
// connection balancing across per-handler listeners.
std::atomic<uint64_t> num_listener_connections_{};
bool is_deleting_{false};
};

/**
* Wrapper for a group of active connections which are attached to the same filter chain context.
*/
class ActiveConnections : public Event::DeferredDeletable {
public:
ActiveConnections(ActiveTcpListener& listener, const Network::FilterChain& filter_chain);
~ActiveConnections();

// listener filter chain pair is the owner of the connections
ActiveTcpListener& listener_;
const Network::FilterChain& filter_chain_;
// Owned connections
std::list<ActiveTcpConnectionPtr> connections_;
};

/**
Expand All @@ -154,22 +174,22 @@ class ConnectionHandlerImpl : public Network::ConnectionHandler,
struct ActiveTcpConnection : LinkedObject<ActiveTcpConnection>,
public Event::DeferredDeletable,
public Network::ConnectionCallbacks {
ActiveTcpConnection(ActiveTcpListener& listener, Network::ConnectionPtr&& new_connection,
TimeSource& time_system);
ActiveTcpConnection(ActiveConnections& active_connections,
Network::ConnectionPtr&& new_connection, TimeSource& time_system);
~ActiveTcpConnection() override;

// Network::ConnectionCallbacks
void onEvent(Network::ConnectionEvent event) override {
// Any event leads to destruction of the connection.
if (event == Network::ConnectionEvent::LocalClose ||
event == Network::ConnectionEvent::RemoteClose) {
listener_.removeConnection(*this);
active_connections_.listener_.removeConnection(*this);
}
}
void onAboveWriteBufferHighWatermark() override {}
void onBelowWriteBufferLowWatermark() override {}

ActiveTcpListener& listener_;
ActiveConnections& active_connections_;
Network::ConnectionPtr connection_;
Stats::TimespanPtr conn_length_;
};
Expand Down
29 changes: 29 additions & 0 deletions source/server/filter_chain_factory_context_callback.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#pragma once

#include <memory>

#include "envoy/api/v2/listener/listener.pb.h"
#include "envoy/common/pure.h"
#include "envoy/server/filter_config.h"

namespace Envoy {
namespace Server {

/**
* Handles FilterChainFactoryContext creation. It is used by a listener to add a new filter chain
* without worrying about the lifetime of each factory context.
*/
class FilterChainFactoryContextCreator {
public:
virtual ~FilterChainFactoryContextCreator() = default;

/**
* Generate the filter chain factory context from proto. Note the caller does not own the filter
* chain context.
*/
virtual Configuration::FilterChainFactoryContext& createFilterChainFactoryContext(
const ::envoy::config::listener::v3alpha::FilterChain* const filter_chain) PURE;
};

} // namespace Server
} // namespace Envoy
Loading

0 comments on commit dd739cd

Please sign in to comment.