Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

network: implement initial heuristic for binding alternate interface #1858

Merged
merged 18 commits into from
Oct 16, 2021
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ envoy_cc_extension(
":filter_cc_proto",
"//library/common/http:internal_headers_lib",
"//library/common/network:configurator_lib",
"//library/common/stream_info:aux_stream_info_lib",
"//library/common/types:c_types_lib",
"@envoy//envoy/http:codes_interface",
"@envoy//envoy/http:filter_interface",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

#include "envoy/server/filter_config.h"

#include "library/common/stream_info/aux_stream_info.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
Expand All @@ -10,21 +12,38 @@ namespace NetworkConfiguration {
Http::FilterHeadersStatus NetworkConfigurationFilter::decodeHeaders(Http::RequestHeaderMap&, bool) {
ENVOY_LOG(debug, "NetworkConfigurationFilter::decodeHeaders");

envoy_network_t network = network_configurator_->getPreferredNetwork();
ENVOY_LOG(debug, "current preferred network: {}", network);
goaway marked this conversation as resolved.
Show resolved Hide resolved
auto& aux_stream_info = StreamInfo::AuxProvider::get(decoder_callbacks_->streamInfo());
auto options = std::make_shared<Network::Socket::Options>();

network_configurator_->setInterfaceBindingEnabled(enable_interface_binding_);
aux_stream_info.configuration_key_ = network_configurator_->addUpstreamSocketOptions(options);
decoder_callbacks_->addUpstreamSocketOptions(options);

return Http::FilterHeadersStatus::Continue;
}

if (enable_interface_binding_) {
override_interface_ = network_configurator_->overrideInterface(network);
}
ENVOY_LOG(debug, "will override interface: {}", override_interface_);
Http::FilterHeadersStatus NetworkConfigurationFilter::encodeHeaders(Http::ResponseHeaderMap&, bool) {
ENVOY_LOG(debug, "NetworkConfigurationFilter::encodeHeaders");

auto connection_options =
network_configurator_->getUpstreamSocketOptions(network, override_interface_);
decoder_callbacks_->addUpstreamSocketOptions(connection_options);
auto& aux_stream_info = StreamInfo::AuxProvider::get(decoder_callbacks_->streamInfo());
network_configurator_->reportNetworkUsage(aux_stream_info.configuration_key_.value(), false);

return Http::FilterHeadersStatus::Continue;
}

Http::LocalErrorStatus NetworkConfigurationFilter::onLocalReply(const LocalReplyData& reply) {
ENVOY_LOG(debug, "NetworkConfigurationFilter::onLocalReply");

auto& stream_info = decoder_callbacks_->streamInfo();
auto& aux_stream_info = StreamInfo::AuxProvider::get(stream_info);

bool success_status = static_cast<int>(reply.code_) < 400;
bool fault = !success_status && !stream_info.firstUpstreamRxByteReceived().has_value();
network_configurator_->reportNetworkUsage(aux_stream_info.configuration_key_.value(), fault);

return Http::LocalErrorStatus::ContinueAndResetStream;
}

} // namespace NetworkConfiguration
} // namespace HttpFilters
} // namespace Extensions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,18 @@ class NetworkConfigurationFilter final : public Http::PassThroughFilter,
NetworkConfigurationFilter(Network::ConfiguratorSharedPtr network_configurator,
bool enable_interface_binding)
: network_configurator_(network_configurator),
enable_interface_binding_(enable_interface_binding), override_interface_(false) {}
enable_interface_binding_(enable_interface_binding) {}

// Http::StreamDecoderFilter
Http::FilterHeadersStatus decodeHeaders(Http::RequestHeaderMap& headers,
bool end_stream) override;
Http::FilterHeadersStatus decodeHeaders(Http::RequestHeaderMap&, bool) override;
// Http::StreamEncoderFilter
Http::FilterHeadersStatus encodeHeaders(Http::ResponseHeaderMap&, bool) override;
// Http::StreamFilterBase
Http::LocalErrorStatus onLocalReply(const LocalReplyData&) override;

private:
Network::ConfiguratorSharedPtr network_configurator_;
bool enable_interface_binding_;
bool override_interface_;
};

} // namespace NetworkConfiguration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ envoy_cc_extension(
deps = [
":predicate_cc_proto",
"//library/common/network:configurator_lib",
"//library/common/stream_info:aux_stream_info_lib",
"@envoy//envoy/upstream:retry_interface",
],
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include "library/common/extensions/retry/options/network_configuration/predicate.h"

#include "library/common/stream_info/aux_stream_info.h"

namespace Envoy {
namespace Extensions {
namespace Retry {
Expand All @@ -15,8 +17,26 @@ NetworkConfigurationRetryOptionsPredicate::NetworkConfigurationRetryOptionsPredi

Upstream::RetryOptionsPredicate::UpdateOptionsReturn
NetworkConfigurationRetryOptionsPredicate::updateOptions(
const Upstream::RetryOptionsPredicate::UpdateOptionsParameters&) const {
return {};
const Upstream::RetryOptionsPredicate::UpdateOptionsParameters& parameters) const {

const auto& stream_info = parameters.retriable_request_stream_info_;
auto& aux_stream_info = StreamInfo::AuxProvider::get(stream_info);

bool fault = !stream_info.firstUpstreamRxByteReceived().has_value();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps move this and the filter fault calculation into a shared static utility so it's a bit easier to comment above why we are doing various things?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added comments to the two locations where we carry out this check (one in the filter, and one in the predicate), but opted not to factor into a shared utility just yet. If this check evolves to become more complicated, and/or we end up calling it from a third place, I'd definitely switch to a shared utility.

// TODO(goaway): The predicate has no inherent way to know the prior configuration key so we need
// some way to retrieve it. Options:
// 1. store the configuration key directly in stream info (but it would need to be non-const)
// 2. store a unique identifier in stream info that can be keyed in a singleton "extra info" map
// 3. store a metadata map in stream info (also needs to be non-const here)
// 4. store in an extension point on stream info for adding extra fields
// Here we use option 2), but 4) seems like a cleaner long-term strategy.
RELEASE_ASSERT(aux_stream_info.configuration_key_.has_value(), "aux stream info missing");
network_configurator_->reportNetworkUsage(aux_stream_info.configuration_key_.value(), fault);

auto options = std::make_shared<Network::Socket::Options>();
aux_stream_info.configuration_key_ = network_configurator_->addUpstreamSocketOptions(options);
Upstream::RetryOptionsPredicate::UpdateOptionsReturn ret{options};
return ret;
}

} // namespace Options
Expand Down
1 change: 1 addition & 0 deletions library/common/http/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ envoy_cc_library(
"//library/common/network:configurator_lib",
"//library/common/network:synthetic_address_lib",
"//library/common/types:c_types_lib",
"//library/common/stream_info:aux_stream_info_lib",
"@envoy//envoy/buffer:buffer_interface",
"@envoy//envoy/common:scope_tracker_interface",
"@envoy//envoy/event:deferred_deletable",
Expand Down
5 changes: 5 additions & 0 deletions library/common/http/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "library/common/http/header_utility.h"
#include "library/common/http/headers.h"
#include "library/common/network/configurator.h"
#include "library/common/stream_info/aux_stream_info.h"

namespace Envoy {
namespace Http {
Expand Down Expand Up @@ -336,6 +337,8 @@ void Client::startStream(envoy_stream_t new_stream_handle, envoy_http_callbacks
direct_stream->request_decoder_ =
&api_listener_.newStream(*direct_stream->callbacks_, true /* is_internally_created */);

auto aux_info = StreamInfo::AuxProvider::create(direct_stream->request_decoder_->streamInfo());
aux_info.stream_id_ = new_stream_handle;
streams_.emplace(new_stream_handle, std::move(direct_stream));
ENVOY_LOG(debug, "[S{}] start stream", new_stream_handle);
}
Expand Down Expand Up @@ -501,6 +504,8 @@ void Client::removeStream(envoy_stream_t stream_handle) {
"[S{}] removeStream is a private method that is only called with stream ids that exist",
stream_handle));

StreamInfo::AuxProvider::clear(direct_stream->request_decoder_->streamInfo());

// The DirectStream should live through synchronous code that already has a reference to it.
// Hence why it is scheduled for deferred deletion. If this was all that was needed then it
// would be sufficient to return a shared_ptr in getStream. However, deferred deletion is still
Expand Down
14 changes: 6 additions & 8 deletions library/common/main_interface.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,12 @@ envoy_status_t reset_stream(envoy_stream_t stream) {
}

envoy_status_t set_preferred_network(envoy_network_t network) {
envoy_network_t previous = Envoy::Network::Configurator::setPreferredNetwork(network);
if (previous != network && previous != ENVOY_NET_GENERIC) {
if (auto e = engine()) {
e->dispatcher().post([network]() -> void {
if (auto e = engine())
e->networkConfigurator().refreshDns(network);
});
}
uint64_t configuration_key = Envoy::Network::Configurator::setPreferredNetwork(network);
if (auto e = engine()) {
e->dispatcher().post([configuration_key]() -> void {
if (auto e = engine())
e->networkConfigurator().refreshDns(configuration_key);
});
}
return ENVOY_SUCCESS;
}
Expand Down
94 changes: 82 additions & 12 deletions library/common/network/configurator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,30 +67,93 @@ SINGLETON_MANAGER_REGISTRATION(network_configurator);

constexpr absl::string_view BaseDnsCache = "base_dns_cache";

std::atomic<envoy_network_t> Configurator::preferred_network_{ENVOY_NET_GENERIC};
std::atomic<Configurator::NetworkState> Configurator::network_state_{
Configurator::NetworkState{1, ENVOY_NET_GENERIC, 3, false}};

envoy_network_t Configurator::setPreferredNetwork(envoy_network_t network) {
uint16_t Configurator::setPreferredNetwork(envoy_network_t network) {
ENVOY_LOG_EVENT(debug, "network_configuration_network_change", std::to_string(network));
return preferred_network_.exchange(network);

Configurator::NetworkState expected_state = network_state_.load();
Configurator::NetworkState new_state;
do {
// ++configuration_key_, network_ = network, remaining_faults_ = 1, overridden_ = false
uint16_t new_configuration_key = expected_state.configuration_key_ + 1;
new_state =
Configurator::NetworkState{new_configuration_key, static_cast<uint16_t>(network), 1, false};
} while (!network_state_.compare_exchange_weak(expected_state, new_state));

return new_state.configuration_key_;
}

envoy_network_t Configurator::getPreferredNetwork() { return preferred_network_.load(); }
envoy_network_t Configurator::getPreferredNetwork() {
return static_cast<envoy_network_t>(network_state_.load().network_);
}

bool Configurator::overrideInterface(envoy_network_t) { return false; }
// If the configuration_key isn't current, don't do anything.
// If there was no fault (i.e. success) reset remaining_faults_ to 3.
// If there was a network fault, decrement remaining_faults_.
// - At 0, increment configuration_key, reset remaining_faults_ to 1 and toggle overridden_.
void Configurator::reportNetworkUsage(uint16_t configuration_key, bool network_fault) {
if (!enable_interface_binding_) {
return;
}

Configurator::NetworkState expected_state = network_state_.load();
Configurator::NetworkState new_state;
do {
goaway marked this conversation as resolved.
Show resolved Hide resolved
// If the configuration_key isn't current, don't do anything.
if (configuration_key != expected_state.configuration_key_) {
return;
}

uint16_t new_configuration_key = configuration_key;
uint16_t network = expected_state.network_;
uint16_t remaining_faults = expected_state.remaining_faults_;
bool overridden = expected_state.overridden_;

if (!network_fault) {
// If there was no fault (i.e. success) reset remaining_faults_ to 3.
remaining_faults = 3;
goaway marked this conversation as resolved.
Show resolved Hide resolved
} else {
// If there was a network fault, decrement remaining_faults_.
remaining_faults--;
ASSERT(remaining_faults >= 0);

// At 0, increment configuration_key, reset remaining_faults_ to 1 and toggle overridden_.
if (remaining_faults == 0) {
++new_configuration_key;
overridden = !overridden;
remaining_faults = 1;
}
}

void Configurator::refreshDns(envoy_network_t network) {
// refreshDns is intended to be queued on Envoy's event loop, whereas preferred_network_ is
// updated synchronously. In the event that multiple refreshes become queued on the event loop,
new_state =
Configurator::NetworkState{new_configuration_key, network, remaining_faults, overridden};
} while (!network_state_.compare_exchange_weak(expected_state, new_state));
goaway marked this conversation as resolved.
Show resolved Hide resolved

// If overridden state changed, refresh dns.
if (expected_state.overridden_ != new_state.overridden_) {
refreshDns(new_state.configuration_key_);
}
}

void Configurator::setInterfaceBindingEnabled(bool enabled) {
enable_interface_binding_ = enabled;
}

void Configurator::refreshDns(uint16_t configuration_key) {
// refreshDns must be queued on Envoy's event loop, whereas network_state_ is updated
// synchronously. In the event that multiple refreshes become queued on the event loop,
// this avoids triggering a refresh for a non-current network.
// Note this does NOT completely prevent parallel refreshes from being triggered in multiple
// flip-flop scenarios.
if (network != preferred_network_.load()) {
ENVOY_LOG_EVENT(debug, "network_configuration_dns_flipflop", std::to_string(network));
if (configuration_key != network_state_.load().configuration_key_) {
ENVOY_LOG_EVENT(debug, "network_configuration_dns_flipflop", std::to_string(configuration_key));
return;
}

if (auto dns_cache = dns_cache_manager_->lookUpCacheByName(BaseDnsCache)) {
ENVOY_LOG_EVENT(debug, "network_configuration_refresh_dns", std::to_string(network));
ENVOY_LOG_EVENT(debug, "network_configuration_refresh_dns", std::to_string(configuration_key));
dns_cache->forceRefreshHosts();
} else {
ENVOY_LOG_EVENT(warn, "network_configuration_dns_cache_missing", BaseDnsCache);
Expand All @@ -107,7 +170,7 @@ std::vector<std::string> Configurator::enumerateV6Interfaces() {

Socket::OptionsSharedPtr Configurator::getUpstreamSocketOptions(envoy_network_t network,
bool override_interface) {
if (override_interface && network != ENVOY_NET_GENERIC) {
if (enable_interface_binding_ && override_interface && network != ENVOY_NET_GENERIC) {
return getAlternateInterfaceSocketOptions(network);
}
// Envoy uses the hash signature of overridden socket options to choose a connection pool.
Expand Down Expand Up @@ -147,6 +210,13 @@ Socket::OptionsSharedPtr Configurator::getAlternateInterfaceSocketOptions(envoy_
return options;
}

uint16_t Configurator::addUpstreamSocketOptions(Socket::OptionsSharedPtr options) {
NetworkState state = network_state_.load();
auto new_options = getUpstreamSocketOptions(static_cast<envoy_network_t>(state.network_), state.overridden_);
options->insert(options->end(), new_options->begin(), new_options->end());
return state.configuration_key_;
}

const std::string Configurator::getActiveAlternateInterface(envoy_network_t network,
unsigned short family) {
// Attempt to derive an active interface that differs from the passed network parameter.
Expand Down
Loading