Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eds: decrease computational complexity of updates #11442

Merged
merged 24 commits into from
Jul 9, 2020
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
b6393c6
New eds_speed_tests and temporary complexity annotations in upstream_…
pgenera Jun 3, 2020
779aa74
Remove N^2 behavior in updateDynamicHostList, write a benchmark for it.
pgenera Jun 4, 2020
5005fcf
Run pre-push hooks
pgenera Jun 4, 2020
de4eeb7
Remove a note I missed in the prior pass
pgenera Jun 4, 2020
46a176e
Respond to (simple) review comments
pgenera Jun 5, 2020
b81bf9b
Merge remote-tracking branch 'upstream/master' into eds-nsquared
pgenera Jun 11, 2020
f846f8f
Respond to reivew comments, fix eds_speed_test.
pgenera Jun 12, 2020
dabdeb6
Merge remote-tracking branch 'upstream/master' into eds-nsquared
pgenera Jun 16, 2020
6d08d00
review comments, fix multiple calls to grpc initializers
pgenera Jun 17, 2020
7cccabb
respond to review comments
pgenera Jun 17, 2020
4d1acad
Solve the mystery of c++ templates.
pgenera Jun 17, 2020
d8929c3
std::remove_if instead of building it myself
pgenera Jun 19, 2020
b21f4e1
respond to review comments: longer test timeout.
pgenera Jun 19, 2020
5fb29eb
Respond to revievw comments, decrease benchmark iterations
pgenera Jun 22, 2020
3661e8f
check_format.py fix
pgenera Jun 22, 2020
6dd6560
appease spellchecker
pgenera Jun 22, 2020
5f4a5a7
Add a flag to skip (and lie about) slow benchmarks.
pgenera Jun 28, 2020
db737b9
spelling
pgenera Jun 28, 2020
3c37bc6
response to review comments
pgenera Jun 29, 2020
e99517d
respond to comments
pgenera Jun 29, 2020
60e769f
respond to review comments
pgenera Jul 6, 2020
3644413
Merge remote-tracking branch 'upstream/master' into eds-nsquared
pgenera Jul 6, 2020
558423a
respond to review comments
pgenera Jul 8, 2020
2b689d1
Kick CI
pgenera Jul 8, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions bazel/test_for_benchmark_wrapper.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#!/bin/bash

# Set the benchmark time to 0 to just verify that the benchmark runs to completion.
"${TEST_SRCDIR}/envoy/$@" --benchmark_min_time=0
# Set the benchmark time to 0 to just verify that the benchmark runs to
# completion. We're interacting with two different flag parsers, so the order
# of flags and the -- matters.
"${TEST_SRCDIR}/envoy/$@" --skip_expensive_benchmarks -- --benchmark_min_time=0
58 changes: 31 additions & 27 deletions source/common/upstream/upstream_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1327,9 +1327,7 @@ bool BaseDynamicClusterImpl::updateDynamicHostList(const HostVector& new_hosts,
bool hosts_changed = false;

// Go through and see if the list we have is different from what we just got. If it is, we make a
// new host list and raise a change notification. This uses an N^2 search given that this does not
// happen very often and the list sizes should be small (see
// https://github.com/envoyproxy/envoy/issues/2874). We also check for duplicates here. It's
// new host list and raise a change notification. We also check for duplicates here. It's
// possible for DNS to return the same address multiple times, and a bad EDS implementation could
// do the same thing.

Expand Down Expand Up @@ -1432,16 +1430,20 @@ bool BaseDynamicClusterImpl::updateDynamicHostList(const HostVector& new_hosts,

// Remove hosts from current_priority_hosts that were matched to an existing host in the previous
// loop.
for (auto itr = current_priority_hosts.begin(); itr != current_priority_hosts.end();) {
auto existing_itr = existing_hosts_for_current_priority.find((*itr)->address()->asString());
auto erase_from =
std::remove_if(current_priority_hosts.begin(), current_priority_hosts.end(),
[&existing_hosts_for_current_priority](const HostSharedPtr& p) {
auto existing_itr =
existing_hosts_for_current_priority.find(p->address()->asString());

if (existing_itr != existing_hosts_for_current_priority.end()) {
existing_hosts_for_current_priority.erase(existing_itr);
itr = current_priority_hosts.erase(itr);
} else {
itr++;
}
}
if (existing_itr != existing_hosts_for_current_priority.end()) {
existing_hosts_for_current_priority.erase(existing_itr);
return true;
}

return false;
});
current_priority_hosts.erase(erase_from, current_priority_hosts.end());

// If we saw existing hosts during this iteration from a different priority, then we've moved
// a host from another priority into this one, so we should mark the priority as having changed.
Expand All @@ -1459,21 +1461,23 @@ bool BaseDynamicClusterImpl::updateDynamicHostList(const HostVector& new_hosts,
const bool dont_remove_healthy_hosts =
health_checker_ != nullptr && !info()->drainConnectionsOnHostRemoval();
if (!current_priority_hosts.empty() && dont_remove_healthy_hosts) {
for (auto i = current_priority_hosts.begin(); i != current_priority_hosts.end();) {
if (!((*i)->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC) ||
(*i)->healthFlagGet(Host::HealthFlag::FAILED_EDS_HEALTH))) {
if ((*i)->weight() > max_host_weight) {
max_host_weight = (*i)->weight();
}

final_hosts.push_back(*i);
updated_hosts[(*i)->address()->asString()] = *i;
(*i)->healthFlagSet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL);
i = current_priority_hosts.erase(i);
} else {
i++;
}
}
erase_from =
std::remove_if(current_priority_hosts.begin(), current_priority_hosts.end(),
[&updated_hosts, &final_hosts, &max_host_weight](const HostSharedPtr& p) {
if (!(p->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC) ||
p->healthFlagGet(Host::HealthFlag::FAILED_EDS_HEALTH))) {
if (p->weight() > max_host_weight) {
max_host_weight = p->weight();
}

final_hosts.push_back(p);
updated_hosts[p->address()->asString()] = p;
p->healthFlagSet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL);
return true;
}
return false;
});
current_priority_hosts.erase(erase_from, current_priority_hosts.end());
}

// At this point we've accounted for all the new hosts as well the hosts that previously
Expand Down
2 changes: 2 additions & 0 deletions test/benchmark/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ envoy_package()
envoy_cc_test_library(
name = "main",
srcs = ["main.cc"],
hdrs = ["main.h"],
external_deps = [
"benchmark",
"tclap",
],
deps = [
"//test/test_common:environment_lib",
Expand Down
32 changes: 28 additions & 4 deletions test/benchmark/main.cc
Original file line number Diff line number Diff line change
@@ -1,16 +1,40 @@
// NOLINT(namespace-envoy)
// This is an Envoy driver for benchmarks.
#include "test/benchmark/main.h"

#include "test/test_common/environment.h"

#include "benchmark/benchmark.h"
#include "tclap/CmdLine.h"

bool skip_expensive_benchmarks = false;
pgenera marked this conversation as resolved.
Show resolved Hide resolved

// Boilerplate main(), which discovers benchmarks and runs them.
// Boilerplate main(), which discovers benchmarks and runs them. This uses two
// different flag parsers, so the order of flags matters: flags defined here
// must be passed first, and flags defined in benchmark::Initialize second,
// separated by --.
// TODO(pgenera): convert this to abseil/flags/ when benchmark also adopts abseil.
int main(int argc, char** argv) {
Envoy::TestEnvironment::initializeTestMain(argv[0]);

benchmark::Initialize(&argc, argv);
if (benchmark::ReportUnrecognizedArguments(argc, argv)) {
return 1;
// NOLINTNEXTLINE(clang-analyzer-optin.cplusplus.VirtualCall)
TCLAP::CmdLine cmd("envoy-benchmark-test", ' ', "0.1");
pgenera marked this conversation as resolved.
Show resolved Hide resolved
TCLAP::SwitchArg skip_switch("s", "skip_expensive_benchmarks",
"skip or minimize expensive benchmarks", cmd, false);

cmd.setExceptionHandling(false);
try {
cmd.parse(argc, argv);
} catch (const TCLAP::ExitException& e) {
// parse() throws an ExitException with status 0 after printing the output
// for --help and --version.
return 0;
}

skip_expensive_benchmarks = skip_switch.getValue();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add some big nice WARNING when this flag is enabled in order to increase the chances of someone noticing the difference between envoy_cc_benchmarks and tests for those benchmarks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in #12121


benchmark::Initialize(&argc, argv);
benchmark::RunSpecifiedBenchmarks();
}

bool SkipExpensiveBenchmarks() { return skip_expensive_benchmarks; }
pgenera marked this conversation as resolved.
Show resolved Hide resolved
7 changes: 7 additions & 0 deletions test/benchmark/main.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#pragma once
// NOLINT(namespace-envoy)

/**
* Benchmarks can use this to skip or hurry through long-running tests in CI.
*/
bool SkipExpensiveBenchmarks();
102 changes: 71 additions & 31 deletions test/common/upstream/eds_speed_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "server/transport_socket_config_impl.h"

#include "test/benchmark/main.h"
#include "test/common/upstream/utility.h"
#include "test/mocks/local_info/mocks.h"
#include "test/mocks/protobuf/mocks.h"
Expand Down Expand Up @@ -43,7 +44,26 @@ class EdsSpeedTest {
local_info_, std::unique_ptr<Grpc::MockAsyncClient>(async_client_), dispatcher_,
*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
"envoy.service.endpoint.v3.EndpointDiscoveryService.StreamEndpoints"),
envoy::config::core::v3::ApiVersion::AUTO, random_, stats_, {}, true)) {}
envoy::config::core::v3::ApiVersion::AUTO, random_, stats_, {}, true)) {
resetCluster(R"EOF(
name: name
connect_timeout: 0.25s
type: EDS
eds_cluster_config:
service_name: fare
eds_config:
api_config_source:
cluster_names:
- eds
refresh_delay: 1s
)EOF",
Envoy::Upstream::Cluster::InitializePhase::Secondary);

EXPECT_CALL(*cm_.subscription_factory_.subscription_, start(_));
cluster_->initialize([this] { initialized_ = true; });
EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(testing::Return(&async_stream_));
subscription_->start({"fare"});
}

void resetCluster(const std::string& yaml_config, Cluster::InitializePhase initialize_phase) {
local_info_.node_.mutable_locality()->set_zone("us-east-1a");
Expand All @@ -63,30 +83,14 @@ class EdsSpeedTest {
std::chrono::milliseconds(), false);
}

void initialize() {
EXPECT_CALL(*cm_.subscription_factory_.subscription_, start(_));
cluster_->initialize([this] { initialized_ = true; });
}

// Set up an EDS config with multiple priorities, localities, weights and make sure
// they are loaded and reloaded as expected.
void priorityAndLocalityWeightedHelper(bool ignore_unknown_dynamic_fields, size_t num_hosts) {
// they are loaded as expected.
void priorityAndLocalityWeightedHelper(bool ignore_unknown_dynamic_fields, size_t num_hosts,
bool healthy) {
state_.PauseTiming();

envoy::config::endpoint::v3::ClusterLoadAssignment cluster_load_assignment;
cluster_load_assignment.set_cluster_name("fare");
resetCluster(R"EOF(
name: name
connect_timeout: 0.25s
type: EDS
eds_cluster_config:
service_name: fare
eds_config:
api_config_source:
cluster_names:
- eds
refresh_delay: 1s
)EOF",
Envoy::Upstream::Cluster::InitializePhase::Secondary);

// Add a whole bunch of hosts in a single place:
auto* endpoints = cluster_load_assignment.add_endpoints();
Expand All @@ -99,18 +103,21 @@ class EdsSpeedTest {

uint32_t port = 1000;
for (size_t i = 0; i < num_hosts; ++i) {
auto* socket_address = endpoints->add_lb_endpoints()
->mutable_endpoint()
->mutable_address()
->mutable_socket_address();
auto* lb_endpoint = endpoints->add_lb_endpoints();
if (healthy) {
lb_endpoint->set_health_status(envoy::config::core::v3::HEALTHY);
} else {
lb_endpoint->set_health_status(envoy::config::core::v3::UNHEALTHY);
}
auto* socket_address =
lb_endpoint->mutable_endpoint()->mutable_address()->mutable_socket_address();
socket_address->set_address("10.0.1." + std::to_string(i / 60000));
socket_address->set_port_value((port + i) % 60000);
}

// this is what we're actually testing:
validation_visitor_.setSkipValidation(ignore_unknown_dynamic_fields);

initialize();
auto response = std::make_unique<envoy::service::discovery::v3::DiscoveryResponse>();
response->set_type_url(type_url_);
auto* resource = response->mutable_resources()->Add();
Expand All @@ -121,11 +128,8 @@ class EdsSpeedTest {
"");
resource->set_type_url("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment");
}
EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(testing::Return(&async_stream_));
subscription_->start({"fare"});
state_.ResumeTiming();
grpc_mux_->grpcStreamForTest().onReceiveMessage(std::move(response));
ASSERT(initialized_);
ASSERT(cluster_->prioritySet().hostSetsPerPriority()[1]->hostsPerLocality().get()[0].size() ==
num_hosts);
}
Expand Down Expand Up @@ -165,8 +169,44 @@ static void priorityAndLocalityWeighted(benchmark::State& state) {
Envoy::Logger::Logger::DEFAULT_LOG_FORMAT, lock, false);
for (auto _ : state) {
Envoy::Upstream::EdsSpeedTest speed_test(state, state.range(0));
speed_test.priorityAndLocalityWeightedHelper(state.range(1), state.range(2));
// if we've been instructed to skip tests, only run once no matter the argument:
uint32_t endpoints = SkipExpensiveBenchmarks() ? 1 : state.range(2);

speed_test.priorityAndLocalityWeightedHelper(state.range(1), endpoints, true);
}
}

BENCHMARK(priorityAndLocalityWeighted)
->Ranges({{false, true}, {false, true}, {1, 100000}})
->Unit(benchmark::kMillisecond);

static void duplicateUpdate(benchmark::State& state) {
Envoy::Thread::MutexBasicLockable lock;
Envoy::Logger::Context logging_state(spdlog::level::warn,
Envoy::Logger::Logger::DEFAULT_LOG_FORMAT, lock, false);

for (auto _ : state) {
Envoy::Upstream::EdsSpeedTest speed_test(state, false);
uint32_t endpoints = SkipExpensiveBenchmarks() ? 1 : state.range(0);

speed_test.priorityAndLocalityWeightedHelper(true, endpoints, true);
speed_test.priorityAndLocalityWeightedHelper(true, endpoints, true);
}
}

BENCHMARK(duplicateUpdate)->Range(1, 100000)->Unit(benchmark::kMillisecond);

static void healthOnlyUpdate(benchmark::State& state) {
Envoy::Thread::MutexBasicLockable lock;
Envoy::Logger::Context logging_state(spdlog::level::warn,
Envoy::Logger::Logger::DEFAULT_LOG_FORMAT, lock, false);
for (auto _ : state) {
Envoy::Upstream::EdsSpeedTest speed_test(state, false);
uint32_t endpoints = SkipExpensiveBenchmarks() ? 1 : state.range(0);

speed_test.priorityAndLocalityWeightedHelper(true, endpoints, true);
speed_test.priorityAndLocalityWeightedHelper(true, endpoints, false);
}
}

BENCHMARK(priorityAndLocalityWeighted)->Ranges({{false, true}, {false, true}, {2000, 100000}});
BENCHMARK(healthOnlyUpdate)->Range(1, 100000)->Unit(benchmark::kMillisecond);