Skip to content

Commit

Permalink
Listener: reset the file event when destroying listener filters (envo…
Browse files Browse the repository at this point in the history
…yproxy#16952)

The listener filter may add event on the new socket, but it won't cleanup the event.
If continue_on_listener_filters_timeout is set, the new connection may add same event
to the socket. So reset the file event before initialize new one.

Signed-off-by: He Jie Xu <hejie.xu@intel.com>
  • Loading branch information
soulxu authored and Le Yao committed Sep 30, 2021
1 parent 579792f commit 660fd6c
Show file tree
Hide file tree
Showing 15 changed files with 138 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ using ConfigSharedPtr = std::shared_ptr<Config>;
class Filter : public Network::ListenerFilter, Logger::Loggable<Logger::Id::filter> {
public:
Filter(const ConfigSharedPtr config);
~Filter() override {
if (cb_) {
cb_->socket().ioHandle().resetFileEvents();
}
}

// Network::ListenerFilter
Network::FilterStatus onAccept(Network::ListenerFilterCallbacks& cb) override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,11 @@ enum class ReadOrParseState { Done, TryAgainLater, Error };
class Filter : public Network::ListenerFilter, Logger::Loggable<Logger::Id::filter> {
public:
Filter(const ConfigSharedPtr& config) : config_(config) {}

~Filter() override {
if (cb_) {
cb_->socket().ioHandle().resetFileEvents();
}
}
// Network::ListenerFilter
Network::FilterStatus onAccept(Network::ListenerFilterCallbacks& cb) override;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ using ConfigSharedPtr = std::shared_ptr<Config>;
class Filter : public Network::ListenerFilter, Logger::Loggable<Logger::Id::filter> {
public:
Filter(const ConfigSharedPtr config);
~Filter() override {
if (cb_) {
cb_->socket().ioHandle().resetFileEvents();
}
}

// Network::ListenerFilter
Network::FilterStatus onAccept(Network::ListenerFilterCallbacks& cb) override;
Expand All @@ -85,7 +90,7 @@ class Filter : public Network::ListenerFilter, Logger::Loggable<Logger::Id::filt
void onServername(absl::string_view name);

ConfigSharedPtr config_;
Network::ListenerFilterCallbacks* cb_;
Network::ListenerFilterCallbacks* cb_{};

bssl::UniquePtr<SSL> ssl_;
uint64_t read_{0};
Expand Down
6 changes: 2 additions & 4 deletions source/server/active_tcp_listener.cc
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,8 @@ void ActiveTcpSocket::newConnection() {
if (socket_->detectedTransportProtocol().empty()) {
socket_->setDetectedTransportProtocol("raw_buffer");
}
// TODO(lambdai): add integration test
// TODO: Address issues in wider scope. See https://github.com/envoyproxy/envoy/issues/8925
// Erase accept filter states because accept filters may not get the opportunity to clean up.
// Particularly the assigned events need to reset before assigning new events in the follow up.
// Clear the listener filter to ensure the file event registered by
// listener filter to be removed. reference https://github.com/envoyproxy/envoy/issues/8925.
accept_filters_.clear();
// Create a new connection on this listener.
listener_.newConnection(std::move(socket_), std::move(stream_info_));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace Extensions {
namespace ListenerFilters {

void ListenerFilterFuzzer::fuzz(
Network::ListenerFilter& filter,
Network::ListenerFilterPtr filter,
const test::extensions::filters::listener::FilterFuzzTestCase& input) {
try {
socket_.addressProvider().setLocalAddress(
Expand All @@ -32,7 +32,7 @@ void ListenerFilterFuzzer::fuzz(
testing::ReturnNew<NiceMock<Event::MockFileEvent>>()));
}

filter.onAccept(cb_);
filter->onAccept(cb_);

if (file_event_callback_ == nullptr) {
// If filter does not call createFileEvent (i.e. original_dst and original_src)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class ListenerFilterFuzzer {
ON_CALL(Const(cb_), dynamicMetadata()).WillByDefault(testing::ReturnRef(metadata_));
}

void fuzz(Network::ListenerFilter& filter,
void fuzz(Network::ListenerFilterPtr filter,
const test::extensions::filters::listener::FilterFuzzTestCase& input);

private:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ DEFINE_PROTO_FUZZER(const test::extensions::filters::listener::FilterFuzzTestCas
auto filter = std::make_unique<Filter>(cfg);

ListenerFilterFuzzer fuzzer;
fuzzer.fuzz(*filter, input);
fuzzer.fuzz(std::move(filter), input);
}

} // namespace HttpInspector
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,20 @@ class HttpInspectorTest : public testing::Test {
HttpInspectorTest()
: cfg_(std::make_shared<Config>(store_)),
io_handle_(std::make_unique<Network::IoSocketHandleImpl>(42)) {}
~HttpInspectorTest() override { io_handle_->close(); }
~HttpInspectorTest() override {
filter_.reset();
EXPECT_CALL(dispatcher_,
createFileEvent_(_, _, Event::PlatformDefaultTriggerType,
Event::FileReadyType::Read | Event::FileReadyType::Closed))
.WillOnce(ReturnNew<NiceMock<Event::MockFileEvent>>());
// This is used to test the FileEvent was reset by the listener filters.
// Otherwise the assertion inside `initializeFileEvent` will be trigger.
io_handle_->initializeFileEvent(
dispatcher_, [](uint32_t) -> void {}, Event::PlatformDefaultTriggerType,
Event::FileReadyType::Read | Event::FileReadyType::Closed);
io_handle_->resetFileEvents();
io_handle_->close();
}

void init(bool include_inline_recv = true) {
filter_ = std::make_unique<Filter>(cfg_);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ DEFINE_PROTO_FUZZER(const test::extensions::filters::listener::FilterFuzzTestCas
auto filter =
std::make_unique<OriginalDstFilter>(envoy::config::core::v3::TrafficDirection::UNSPECIFIED);
ListenerFilterFuzzer fuzzer;
fuzzer.fuzz(*filter, input);
fuzzer.fuzz(std::move(filter), input);
}

} // namespace OriginalDst
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ DEFINE_PROTO_FUZZER(
Config config(input.config());
auto filter = std::make_unique<OriginalSrcFilter>(config);
ListenerFilterFuzzer fuzzer;
fuzzer.fuzz(*filter, input.fuzzed());
fuzzer.fuzz(std::move(filter), input.fuzzed());
}

} // namespace OriginalSrc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ DEFINE_PROTO_FUZZER(
auto filter = std::make_unique<Filter>(std::move(cfg));

ListenerFilterFuzzer fuzzer;
fuzzer.fuzz(*filter, input.fuzzed());
fuzzer.fuzz(std::move(filter), input.fuzzed());
}

} // namespace ProxyProtocol
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class ProxyProtocolTest : public testing::TestWithParam<Network::Address::IpVers
protected Logger::Loggable<Logger::Id::main> {
public:
ProxyProtocolTest()
: api_(Api::createApiForTest(stats_store_)),
: api_(Api::createApiForTest(stats_store_, time_system_)),
dispatcher_(api_->allocateDispatcher("test_thread")),
socket_(std::make_shared<Network::TcpListenSocket>(
Network::Test::getCanonicalLoopbackAddress(GetParam()), nullptr, true)),
Expand All @@ -78,8 +78,10 @@ class ProxyProtocolTest : public testing::TestWithParam<Network::Address::IpVers
bool bindToPort() override { return true; }
bool handOffRestoredDestinationConnections() const override { return false; }
uint32_t perConnectionBufferLimitBytes() const override { return 0; }
std::chrono::milliseconds listenerFiltersTimeout() const override { return {}; }
bool continueOnListenerFiltersTimeout() const override { return false; }
std::chrono::milliseconds listenerFiltersTimeout() const override {
return std::chrono::milliseconds(1000);
}
bool continueOnListenerFiltersTimeout() const override { return true; }
Stats::Scope& listenerScope() override { return stats_store_; }
uint64_t listenerTag() const override { return 1; }
ResourceLimit& openConnections() override { return open_connections_; }
Expand Down Expand Up @@ -183,6 +185,7 @@ class ProxyProtocolTest : public testing::TestWithParam<Network::Address::IpVers
EXPECT_EQ(stats_store_.counter("downstream_cx_proxy_proto_error").value(), 1);
}

Event::SimulatedTimeSystemHelper time_system_;
Stats::TestUtil::TestStore stats_store_;
Api::ApiPtr api_;
BasicResourceLimitImpl open_connections_;
Expand All @@ -209,6 +212,23 @@ INSTANTIATE_TEST_SUITE_P(IpVersions, ProxyProtocolTest,
testing::ValuesIn(TestEnvironment::getIpVersionsForTest()),
TestUtility::ipTestParamsToString);

// This test ensures the socket file event was reset after timeout, otherwise
// the assertion which avoid to create file event duplicated will be triggered.
TEST_P(ProxyProtocolTest, Timeout) {
connect();
time_system_.advanceTimeAndRun(std::chrono::milliseconds(2000), *dispatcher_,
Event::Dispatcher::RunType::NonBlock);
if (GetParam() == Envoy::Network::Address::IpVersion::v4) {
EXPECT_EQ(server_connection_->addressProvider().remoteAddress()->ip()->addressAsString(),
"127.0.0.1");
} else {
EXPECT_EQ(server_connection_->addressProvider().remoteAddress()->ip()->addressAsString(),
"::1");
}
EXPECT_EQ(stats_store_.counter("downstream_cx_total").value(), 1);
disconnect();
}

TEST_P(ProxyProtocolTest, V1Basic) {
connect();
write("PROXY TCP4 1.2.3.4 253.253.253.253 65535 1234\r\nmore data");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ DEFINE_PROTO_FUZZER(
auto filter = std::make_unique<Filter>(std::move(cfg));

ListenerFilterFuzzer fuzzer;
fuzzer.fuzz(*filter, input.fuzzed());
fuzzer.fuzz(std::move(filter), input.fuzzed());
}

} // namespace TlsInspector
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,20 @@ class TlsInspectorTest : public testing::TestWithParam<std::tuple<uint16_t, uint
TlsInspectorTest()
: cfg_(std::make_shared<Config>(store_)),
io_handle_(std::make_unique<Network::IoSocketHandleImpl>(42)) {}
~TlsInspectorTest() override { io_handle_->close(); }
~TlsInspectorTest() override {
filter_.reset();
EXPECT_CALL(dispatcher_,
createFileEvent_(_, _, Event::PlatformDefaultTriggerType,
Event::FileReadyType::Read | Event::FileReadyType::Closed))
.WillOnce(ReturnNew<NiceMock<Event::MockFileEvent>>());
// This is used to test the FileEvent was reset by the listener filters.
// Otherwise the assertion inside `initializeFileEvent` will be trigger.
io_handle_->initializeFileEvent(
dispatcher_, [](uint32_t) -> void {}, Event::PlatformDefaultTriggerType,
Event::FileReadyType::Read | Event::FileReadyType::Closed);
io_handle_->resetFileEvents();
io_handle_->close();
}

void init() {
filter_ = std::make_unique<Filter>(cfg_);
Expand Down
84 changes: 61 additions & 23 deletions test/integration/listener_filter_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,44 +48,64 @@ class ListenerFilterIntegrationTest : public testing::TestWithParam<Network::Add
}
}

void initializeWithListenerFilter(absl::optional<bool> listener_filter_disabled = absl::nullopt) {
void initializeWithListenerFilter(bool ssl_client,
absl::optional<bool> listener_filter_disabled = absl::nullopt) {
config_helper_.renameListener("echo");
std::string tls_inspector_config = ConfigHelper::tlsInspectorFilter();
if (listener_filter_disabled.has_value()) {
tls_inspector_config = appendMatcher(tls_inspector_config, listener_filter_disabled.value());
}
config_helper_.addListenerFilter(tls_inspector_config);
config_helper_.addConfigModifier([](envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
auto* filter_chain =
bootstrap.mutable_static_resources()->mutable_listeners(0)->mutable_filter_chains(0);
auto* alpn = filter_chain->mutable_filter_chain_match()->add_application_protocols();
*alpn = "envoyalpn";

config_helper_.addConfigModifier([ssl_client](
envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
if (ssl_client) {
auto* filter_chain =
bootstrap.mutable_static_resources()->mutable_listeners(0)->mutable_filter_chains(0);
auto* alpn = filter_chain->mutable_filter_chain_match()->add_application_protocols();
*alpn = "envoyalpn";
}
auto* timeout = bootstrap.mutable_static_resources()
->mutable_listeners(0)
->mutable_listener_filters_timeout();
timeout->MergeFrom(ProtobufUtil::TimeUtil::MillisecondsToDuration(1000));
bootstrap.mutable_static_resources()
->mutable_listeners(0)
->set_continue_on_listener_filters_timeout(true);
});
config_helper_.addSslConfig();
if (ssl_client) {
config_helper_.addSslConfig();
}

useListenerAccessLog("%RESPONSE_CODE_DETAILS%");
BaseIntegrationTest::initialize();

context_manager_ =
std::make_unique<Extensions::TransportSockets::Tls::ContextManagerImpl>(timeSystem());
}

void setupConnections(bool listener_filter_disabled, bool expect_connection_open) {
initializeWithListenerFilter(listener_filter_disabled);
void setupConnections(bool listener_filter_disabled, bool expect_connection_open,
bool ssl_client) {
initializeWithListenerFilter(ssl_client, listener_filter_disabled);

// Set up the SSL client.
Network::Address::InstanceConstSharedPtr address =
Ssl::getSslAddress(version_, lookupPort("echo"));
context_ = Ssl::createClientSslTransportSocketFactory({}, *context_manager_, *api_);
ssl_client_ = dispatcher_->createClientConnection(
address, Network::Address::InstanceConstSharedPtr(),
context_->createTransportSocket(
// nullptr
std::make_shared<Network::TransportSocketOptionsImpl>(
absl::string_view(""), std::vector<std::string>(),
std::vector<std::string>{"envoyalpn"})),
nullptr);
ssl_client_->addConnectionCallbacks(connect_callbacks_);
ssl_client_->connect();
Network::TransportSocketPtr transport_socket;
if (ssl_client) {
transport_socket =
context_->createTransportSocket(std::make_shared<Network::TransportSocketOptionsImpl>(
absl::string_view(""), std::vector<std::string>(),
std::vector<std::string>{"envoyalpn"}));
} else {
auto transport_socket_factory = std::make_unique<Network::RawBufferSocketFactory>();
transport_socket = transport_socket_factory->createTransportSocket(nullptr);
}
client_ = dispatcher_->createClientConnection(
address, Network::Address::InstanceConstSharedPtr(), std::move(transport_socket), nullptr);
client_->addConnectionCallbacks(connect_callbacks_);
client_->connect();
while (!connect_callbacks_.connected() && !connect_callbacks_.closed()) {
dispatcher_->run(Event::Dispatcher::RunType::NonBlock);
}
Expand All @@ -98,27 +118,45 @@ class ListenerFilterIntegrationTest : public testing::TestWithParam<Network::Add
ASSERT(connect_callbacks_.closed());
}
}

std::unique_ptr<Ssl::ContextManager> context_manager_;
Network::TransportSocketFactoryPtr context_;
ConnectionStatusCallbacks connect_callbacks_;
testing::NiceMock<Secret::MockSecretManager> secret_manager_;
Network::ClientConnectionPtr ssl_client_;
Network::ClientConnectionPtr client_;
};

// Each listener filter is enabled by default.
TEST_P(ListenerFilterIntegrationTest, AllListenerFiltersAreEnabledByDefault) {
setupConnections(/*listener_filter_disabled=*/false, /*expect_connection_open=*/true);
ssl_client_->close(Network::ConnectionCloseType::NoFlush);
setupConnections(/*listener_filter_disabled=*/false, /*expect_connection_open=*/true,
/*ssl_client=*/true);
client_->close(Network::ConnectionCloseType::NoFlush);
EXPECT_THAT(waitForAccessLog(listener_access_log_name_), testing::Eq("-"));
}

// The tls_inspector is disabled. The ALPN won't be sniffed out and no filter chain is matched.
TEST_P(ListenerFilterIntegrationTest, DisabledTlsInspectorFailsFilterChainFind) {
setupConnections(/*listener_filter_disabled=*/true, /*expect_connection_open=*/false);
setupConnections(/*listener_filter_disabled=*/true, /*expect_connection_open=*/false,
/*ssl_client=*/true);
EXPECT_THAT(waitForAccessLog(listener_access_log_name_),
testing::Eq(StreamInfo::ResponseCodeDetails::get().FilterChainNotFound));
}

// trigger the tls inspect filter timeout, and continue create new connection after timeout
TEST_P(ListenerFilterIntegrationTest, ContinueOnListenerTimeout) {
setupConnections(/*listener_filter_disabled=*/false, /*expect_connection_open=*/true,
/*ssl_client=*/false);
// The length of tls hello message is defined as `TLS_MAX_CLIENT_HELLO = 64 * 1024`
// if tls inspect filter doesn't read the max length of hello message data, it
// will continue wait. Then the listener filter timeout timer will be triggered.
Buffer::OwnedImpl buffer("fake data");
client_->write(buffer, false);
// the timeout is set as one seconds, sleep 5 to trigger the timeout.
timeSystem().advanceTimeWaitImpl(std::chrono::milliseconds(2000));
client_->close(Network::ConnectionCloseType::NoFlush);
EXPECT_THAT(waitForAccessLog(listener_access_log_name_), testing::Eq("-"));
}

INSTANTIATE_TEST_SUITE_P(IpVersions, ListenerFilterIntegrationTest,
testing::ValuesIn(TestEnvironment::getIpVersionsForTest()),
TestUtility::ipTestParamsToString);
Expand Down

0 comments on commit 660fd6c

Please sign in to comment.