Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eds: decrease computational complexity of updates #11442

Merged
merged 24 commits into from
Jul 9, 2020
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
b6393c6
New eds_speed_tests and temporary complexity annotations in upstream_…
pgenera Jun 3, 2020
779aa74
Remove N^2 behavior in updateDynamicHostList, write a benchmark for it.
pgenera Jun 4, 2020
5005fcf
Run pre-push hooks
pgenera Jun 4, 2020
de4eeb7
Remove a note I missed in the prior pass
pgenera Jun 4, 2020
46a176e
Respond to (simple) review comments
pgenera Jun 5, 2020
b81bf9b
Merge remote-tracking branch 'upstream/master' into eds-nsquared
pgenera Jun 11, 2020
f846f8f
Respond to reivew comments, fix eds_speed_test.
pgenera Jun 12, 2020
dabdeb6
Merge remote-tracking branch 'upstream/master' into eds-nsquared
pgenera Jun 16, 2020
6d08d00
review comments, fix multiple calls to grpc initializers
pgenera Jun 17, 2020
7cccabb
respond to review comments
pgenera Jun 17, 2020
4d1acad
Solve the mystery of c++ templates.
pgenera Jun 17, 2020
d8929c3
std::remove_if instead of building it myself
pgenera Jun 19, 2020
b21f4e1
respond to review comments: longer test timeout.
pgenera Jun 19, 2020
5fb29eb
Respond to revievw comments, decrease benchmark iterations
pgenera Jun 22, 2020
3661e8f
check_format.py fix
pgenera Jun 22, 2020
6dd6560
appease spellchecker
pgenera Jun 22, 2020
5f4a5a7
Add a flag to skip (and lie about) slow benchmarks.
pgenera Jun 28, 2020
db737b9
spelling
pgenera Jun 28, 2020
3c37bc6
response to review comments
pgenera Jun 29, 2020
e99517d
respond to comments
pgenera Jun 29, 2020
60e769f
respond to review comments
pgenera Jul 6, 2020
3644413
Merge remote-tracking branch 'upstream/master' into eds-nsquared
pgenera Jul 6, 2020
558423a
respond to review comments
pgenera Jul 8, 2020
2b689d1
Kick CI
pgenera Jul 8, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions source/common/common/utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -709,4 +709,32 @@ class InlineString : public InlineStorage {
char data_[];
};

/**
* Utilities for working with containers.
*/
class Containers {
public:
/**
* Remove elements that match the predicate from the list by swapping them to
* the end and truncating. Iterates over c once, order is not preserved.
pgenera marked this conversation as resolved.
Show resolved Hide resolved
*
* @param c The container on which to operate
* @param predicate A function which will be called once for every entry in c,
* which returns true if that entry should be removed.
*/
template <class Container, typename T>
static void removeMatchingElements(Container& c, std::function<bool(const T&)> predicate) {
pgenera marked this conversation as resolved.
Show resolved Hide resolved
auto erase_from = c.end();
for (auto i = c.begin(); i != erase_from;) {
if (predicate(*i)) {
*i = *(--erase_from);
} else {
++i;
}
}

c.erase(erase_from, c.end());
}
};

} // namespace Envoy
48 changes: 24 additions & 24 deletions source/common/upstream/upstream_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1327,9 +1327,7 @@ bool BaseDynamicClusterImpl::updateDynamicHostList(const HostVector& new_hosts,
bool hosts_changed = false;

// Go through and see if the list we have is different from what we just got. If it is, we make a
// new host list and raise a change notification. This uses an N^2 search given that this does not
// happen very often and the list sizes should be small (see
// https://github.com/envoyproxy/envoy/issues/2874). We also check for duplicates here. It's
// new host list and raise a change notification. We also check for duplicates here. It's
// possible for DNS to return the same address multiple times, and a bad EDS implementation could
// do the same thing.

Expand Down Expand Up @@ -1432,16 +1430,18 @@ bool BaseDynamicClusterImpl::updateDynamicHostList(const HostVector& new_hosts,

// Remove hosts from current_priority_hosts that were matched to an existing host in the previous
// loop.
for (auto itr = current_priority_hosts.begin(); itr != current_priority_hosts.end();) {
auto existing_itr = existing_hosts_for_current_priority.find((*itr)->address()->asString());
std::function<bool(const HostSharedPtr&)> predicate =
[&existing_hosts_for_current_priority](const HostSharedPtr& p) mutable {
auto existing_itr = existing_hosts_for_current_priority.find(p->address()->asString());

if (existing_itr != existing_hosts_for_current_priority.end()) {
existing_hosts_for_current_priority.erase(existing_itr);
itr = current_priority_hosts.erase(itr);
} else {
itr++;
}
}
if (existing_itr != existing_hosts_for_current_priority.end()) {
existing_hosts_for_current_priority.erase(existing_itr);
return true;
}

return false;
};
Containers::removeMatchingElements(current_priority_hosts, predicate);

// If we saw existing hosts during this iteration from a different priority, then we've moved
// a host from another priority into this one, so we should mark the priority as having changed.
Expand All @@ -1459,22 +1459,22 @@ bool BaseDynamicClusterImpl::updateDynamicHostList(const HostVector& new_hosts,
const bool dont_remove_healthy_hosts =
health_checker_ != nullptr && !info()->drainConnectionsOnHostRemoval();
if (!current_priority_hosts.empty() && dont_remove_healthy_hosts) {
for (auto i = current_priority_hosts.begin(); i != current_priority_hosts.end();) {
if (!((*i)->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC) ||
(*i)->healthFlagGet(Host::HealthFlag::FAILED_EDS_HEALTH))) {
if ((*i)->weight() > max_host_weight) {
max_host_weight = (*i)->weight();
predicate = [&updated_hosts, &final_hosts, &max_host_weight](const HostSharedPtr& p) mutable {
if (!(p->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC) ||
p->healthFlagGet(Host::HealthFlag::FAILED_EDS_HEALTH))) {
if (p->weight() > max_host_weight) {
max_host_weight = p->weight();
}

final_hosts.push_back(*i);
updated_hosts[(*i)->address()->asString()] = *i;
(*i)->healthFlagSet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL);
i = current_priority_hosts.erase(i);
} else {
i++;
final_hosts.push_back(p);
updated_hosts[p->address()->asString()] = p;
p->healthFlagSet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL);
return true;
}
}
return false;
};
}
Containers::removeMatchingElements(current_priority_hosts, predicate);
pgenera marked this conversation as resolved.
Show resolved Hide resolved

// At this point we've accounted for all the new hosts as well the hosts that previously
// existed in this priority.
Expand Down
25 changes: 25 additions & 0 deletions test/common/common/utility_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -875,4 +875,29 @@ TEST(InlineStorageTest, InlineString) {
EXPECT_EQ("Hello, world!", hello->toString());
}

TEST(ContainerRemoveElementsTest, Containers) {
auto l = StringUtil::splitToken("one,two,three", ",");
std::function<bool(const absl::string_view&)> onep = [](const absl::string_view& s) {
pgenera marked this conversation as resolved.
Show resolved Hide resolved
return "one" == s;
};
std::function<bool(const absl::string_view&)> truep =
[](ABSL_ATTRIBUTE_UNUSED const absl::string_view& s) { return true; };
std::function<bool(const absl::string_view&)> falsep =
[](ABSL_ATTRIBUTE_UNUSED const absl::string_view& s) { return false; };

Containers::removeMatchingElements(l, falsep);
// nothing is removed:
EXPECT_EQ(3, l.size());

Containers::removeMatchingElements(l, onep);
// one element is removed:
EXPECT_EQ(2, l.size());
// and the last element is now first:
EXPECT_EQ("three", l[0]);

Containers::removeMatchingElements(l, truep);
// everything is removed:
EXPECT_EQ(0, l.size());
pgenera marked this conversation as resolved.
Show resolved Hide resolved
}

} // namespace Envoy
87 changes: 58 additions & 29 deletions test/common/upstream/eds_speed_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,24 @@ class EdsSpeedTest {
local_info_, std::unique_ptr<Grpc::MockAsyncClient>(async_client_), dispatcher_,
*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
"envoy.service.endpoint.v3.EndpointDiscoveryService.StreamEndpoints"),
envoy::config::core::v3::ApiVersion::AUTO, random_, stats_, {}, true)) {}
envoy::config::core::v3::ApiVersion::AUTO, random_, stats_, {}, true)) {
resetCluster(R"EOF(
name: name
connect_timeout: 0.25s
type: EDS
eds_cluster_config:
service_name: fare
eds_config:
api_config_source:
cluster_names:
- eds
refresh_delay: 1s
)EOF",
Envoy::Upstream::Cluster::InitializePhase::Secondary);

EXPECT_CALL(*cm_.subscription_factory_.subscription_, start(_));
cluster_->initialize([this] { initialized_ = true; });
}

void resetCluster(const std::string& yaml_config, Cluster::InitializePhase initialize_phase) {
local_info_.node_.mutable_locality()->set_zone("us-east-1a");
Expand All @@ -63,30 +80,13 @@ class EdsSpeedTest {
std::chrono::milliseconds(), false);
}

void initialize() {
EXPECT_CALL(*cm_.subscription_factory_.subscription_, start(_));
cluster_->initialize([this] { initialized_ = true; });
}

// Set up an EDS config with multiple priorities, localities, weights and make sure
// they are loaded and reloaded as expected.
void priorityAndLocalityWeightedHelper(bool ignore_unknown_dynamic_fields, size_t num_hosts) {
// they are loaded as expected.
void priorityAndLocalityWeightedHelper(bool ignore_unknown_dynamic_fields, size_t num_hosts,
bool healthy) {
state_.PauseTiming();
envoy::config::endpoint::v3::ClusterLoadAssignment cluster_load_assignment;
cluster_load_assignment.set_cluster_name("fare");
resetCluster(R"EOF(
name: name
connect_timeout: 0.25s
type: EDS
eds_cluster_config:
service_name: fare
eds_config:
api_config_source:
cluster_names:
- eds
refresh_delay: 1s
)EOF",
Envoy::Upstream::Cluster::InitializePhase::Secondary);

// Add a whole bunch of hosts in a single place:
auto* endpoints = cluster_load_assignment.add_endpoints();
Expand All @@ -99,18 +99,21 @@ class EdsSpeedTest {

uint32_t port = 1000;
for (size_t i = 0; i < num_hosts; ++i) {
auto* socket_address = endpoints->add_lb_endpoints()
->mutable_endpoint()
->mutable_address()
->mutable_socket_address();
auto* lb_endpoint = endpoints->add_lb_endpoints();
if (healthy) {
lb_endpoint->set_health_status(envoy::config::core::v3::HEALTHY);
} else {
lb_endpoint->set_health_status(envoy::config::core::v3::UNHEALTHY);
}
auto* socket_address =
lb_endpoint->mutable_endpoint()->mutable_address()->mutable_socket_address();
socket_address->set_address("10.0.1." + std::to_string(i / 60000));
socket_address->set_port_value((port + i) % 60000);
}

// this is what we're actually testing:
validation_visitor_.setSkipValidation(ignore_unknown_dynamic_fields);

initialize();
auto response = std::make_unique<envoy::service::discovery::v3::DiscoveryResponse>();
response->set_type_url(type_url_);
auto* resource = response->mutable_resources()->Add();
Expand All @@ -121,11 +124,11 @@ class EdsSpeedTest {
"");
resource->set_type_url("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment");
}
EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(testing::Return(&async_stream_));
EXPECT_CALL(*async_client_, startRaw(_, _, _, _))
.WillRepeatedly(testing::Return(&async_stream_));
subscription_->start({"fare"});
state_.ResumeTiming();
grpc_mux_->grpcStreamForTest().onReceiveMessage(std::move(response));
ASSERT(initialized_);
ASSERT(cluster_->prioritySet().hostSetsPerPriority()[1]->hostsPerLocality().get()[0].size() ==
num_hosts);
}
Expand Down Expand Up @@ -165,8 +168,34 @@ static void priorityAndLocalityWeighted(benchmark::State& state) {
Envoy::Logger::Logger::DEFAULT_LOG_FORMAT, lock, false);
for (auto _ : state) {
Envoy::Upstream::EdsSpeedTest speed_test(state, state.range(0));
speed_test.priorityAndLocalityWeightedHelper(state.range(1), state.range(2));
speed_test.priorityAndLocalityWeightedHelper(state.range(1), state.range(2), true);
}
}

BENCHMARK(priorityAndLocalityWeighted)->Ranges({{false, true}, {false, true}, {2000, 100000}});

static void duplicateUpdate(benchmark::State& state) {
Envoy::Thread::MutexBasicLockable lock;
Envoy::Logger::Context logging_state(spdlog::level::warn,
Envoy::Logger::Logger::DEFAULT_LOG_FORMAT, lock, false);
for (auto _ : state) {
Envoy::Upstream::EdsSpeedTest speed_test(state, false);
speed_test.priorityAndLocalityWeightedHelper(true, state.range(0), true);
speed_test.priorityAndLocalityWeightedHelper(true, state.range(0), true);
}
}

BENCHMARK(duplicateUpdate)->Range(2000, 100000);

static void healthOnlyUpdate(benchmark::State& state) {
Envoy::Thread::MutexBasicLockable lock;
Envoy::Logger::Context logging_state(spdlog::level::warn,
Envoy::Logger::Logger::DEFAULT_LOG_FORMAT, lock, false);
for (auto _ : state) {
Envoy::Upstream::EdsSpeedTest speed_test(state, false);
speed_test.priorityAndLocalityWeightedHelper(true, state.range(0), true);
speed_test.priorityAndLocalityWeightedHelper(true, state.range(0), false);
}
}

BENCHMARK(healthOnlyUpdate)->Range(2000, 100000);