Skip to content

Commit

Permalink
[forge] Mainnet Like Network Simulation (#7533)
Browse files Browse the repository at this point in the history
This PR adds the ability to simulate as close to a real network as possible in Forge. It adds a new NetEm config that can support emulating delay, loss, corruption, bandwidth between any two targets within the same test, overcoming existing drawbacks where multiple chaos simulations couldn't be stacked.

This provides the MultiRegionNetworkEmulationTest which enables simulating a multi-region network and optionally add chaos between nodes in the same region group. Also, CpuChaosTest enables stressing the CPU to mimic varying CPU cores between validators.
  • Loading branch information
ibalajiarun authored and gedigi committed Jun 6, 2023
1 parent 6267ea1 commit fa618bc
Show file tree
Hide file tree
Showing 9 changed files with 601 additions and 199 deletions.
21 changes: 13 additions & 8 deletions testsuite/forge-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use aptos_testcases::{
fullnode_reboot_stress_test::FullNodeRebootStressTest,
generate_traffic,
load_vs_perf_benchmark::{LoadVsPerfBenchmark, TransactionWorkload, Workloads},
modifiers::{ExecutionDelayConfig, ExecutionDelayTest},
multi_region_simulation_test::MultiRegionMultiCloudSimulationTest,
modifiers::{CpuChaosTest, ExecutionDelayConfig, ExecutionDelayTest},
multi_region_network_test::MultiRegionNetworkEmulationTest,
network_bandwidth_test::NetworkBandwidthTest,
network_loss_test::NetworkLossTest,
network_partition_test::NetworkPartitionTest,
Expand Down Expand Up @@ -537,9 +537,7 @@ fn single_test_suite(test_name: &str) -> Result<ForgeConfig<'static>> {
run_consensus_only_three_region_simulation(config)
},
"quorum_store_reconfig_enable_test" => quorum_store_reconfig_enable_test(config),
"multi_region_multi_cloud_simulation_test" => {
multi_region_multi_cloud_simulation_test(config)
},
"mainnet_like_simulation_test" => mainnet_like_simulation_test(config),
"multiregion_benchmark_test" => multiregion_benchmark_test(config),
_ => return Err(format_err!("Invalid --suite given: {:?}", test_name)),
};
Expand Down Expand Up @@ -1583,17 +1581,24 @@ fn quorum_store_reconfig_enable_test(forge_config: ForgeConfig<'static>) -> Forg
)
}

fn multi_region_multi_cloud_simulation_test(config: ForgeConfig<'static>) -> ForgeConfig<'static> {
fn mainnet_like_simulation_test(config: ForgeConfig<'static>) -> ForgeConfig<'static> {
config
.with_initial_validator_count(NonZeroUsize::new(100).unwrap())
.with_initial_validator_count(NonZeroUsize::new(20).unwrap())
.with_emit_job(
EmitJobRequest::default()
.mode(EmitJobMode::MaxLoad {
mempool_backlog: 200_000,
})
.txn_expiration_time_secs(5 * 60),
)
.with_network_tests(vec![&MultiRegionMultiCloudSimulationTest {}])
.with_network_tests(vec![&CompositeNetworkTest {
wrapper: &MultiRegionNetworkEmulationTest {
override_config: None,
},
test: &CpuChaosTest {
override_config: None,
},
}])
.with_genesis_helm_config_fn(Arc::new(|helm_values| {
// no epoch change.
helm_values["chain"]["epoch_duration_secs"] = (24 * 3600).into();
Expand Down
100 changes: 98 additions & 2 deletions testsuite/forge/src/backend/k8s/chaos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
// SPDX-License-Identifier: Apache-2.0

use crate::{
dump_string_to_file, K8sSwarm, Result, Swarm, SwarmChaos, SwarmNetworkBandwidth,
SwarmNetworkDelay, SwarmNetworkLoss, SwarmNetworkPartition, KUBECTL_BIN,
dump_string_to_file, K8sSwarm, Result, Swarm, SwarmChaos, SwarmCpuStress, SwarmNetEm,
SwarmNetworkBandwidth, SwarmNetworkDelay, SwarmNetworkLoss, SwarmNetworkPartition, KUBECTL_BIN,
};
use anyhow::bail;
use aptos_logger::info;
Expand Down Expand Up @@ -32,6 +32,18 @@ macro_rules! NETWORK_LOSS_CHAOS_TEMPLATE {
};
}

macro_rules! NETEM_CHAOS_TEMPLATE {
() => {
"chaos/netem.yaml"
};
}

macro_rules! CPU_STRESS_CHAOS_TEMPLATE {
() => {
"chaos/cpu_stress.yaml"
};
}

impl K8sSwarm {
/// Injects the SwarmChaos into the specified namespace
pub fn inject_swarm_chaos(&self, chaos: &SwarmChaos) -> Result<()> {
Expand Down Expand Up @@ -166,12 +178,96 @@ impl K8sSwarm {
))
}

fn create_netem_template(&self, swarm_netem: &SwarmNetEm) -> Result<String> {
let mut network_chaos_specs = vec![];

for group_netem in &swarm_netem.group_netems {
let source_instance_labels = group_netem
.source_nodes
.iter()
.map(|node| {
if let Some(v) = self.validator(*node) {
v.name()
} else {
"invalid-node"
}
})
.collect::<Vec<_>>()
.join(",");

let target_instance_labels = group_netem
.target_nodes
.iter()
.map(|node| {
if let Some(v) = self.validator(*node) {
v.name()
} else {
"invalid-node"
}
})
.collect::<Vec<_>>()
.join(",");

network_chaos_specs.push(format!(
include_str!(NETEM_CHAOS_TEMPLATE!()),
name = &group_netem.name,
namespace = self.kube_namespace,
delay_latency_ms = group_netem.delay_latency_ms,
delay_jitter_ms = group_netem.delay_jitter_ms,
delay_correlation_percentage = group_netem.delay_correlation_percentage,
loss_percentage = group_netem.loss_percentage,
loss_correlation_percentage = group_netem.loss_correlation_percentage,
instance_labels = &source_instance_labels,
target_instance_labels = &target_instance_labels,
rate = group_netem.rate_in_mbps,
));
}

Ok(network_chaos_specs.join("\n---\n"))
}

/// Creates the CPU stress template, which can be used to inject CPU stress into a pod.
/// This can be used to simulate nodes with different available CPU resource even though the
/// nodes have identical hardware. For example, a node with 4 cores can be simulated as a node
/// with 2 cores by setting num_workers to 2.
fn create_cpu_stress_template(&self, swarm_cpu_stress: &SwarmCpuStress) -> Result<String> {
let mut cpu_stress_specs = vec![];

for group_cpu_stress in &swarm_cpu_stress.group_cpu_stresses {
let instance_labels = group_cpu_stress
.target_nodes
.iter()
.map(|node| {
if let Some(v) = self.validator(*node) {
v.name()
} else {
"invalid-node"
}
})
.collect::<Vec<_>>()
.join(",");

cpu_stress_specs.push(format!(
include_str!(CPU_STRESS_CHAOS_TEMPLATE!()),
name = &group_cpu_stress.name,
namespace = self.kube_namespace,
num_workers = group_cpu_stress.num_workers,
load_per_worker = group_cpu_stress.load_per_worker,
instance_labels = &instance_labels,
));
}

Ok(cpu_stress_specs.join("\n---\n"))
}

fn create_chaos_template(&self, chaos: &SwarmChaos) -> Result<String> {
match chaos {
SwarmChaos::Delay(c) => self.create_network_delay_template(c),
SwarmChaos::Partition(c) => self.create_network_partition_template(c),
SwarmChaos::Bandwidth(c) => self.create_network_bandwidth_template(c),
SwarmChaos::Loss(c) => self.create_network_loss_template(c),
SwarmChaos::NetEm(c) => self.create_netem_template(c),
SwarmChaos::CpuStress(c) => self.create_cpu_stress_template(c),
}
}

Expand Down
16 changes: 16 additions & 0 deletions testsuite/forge/src/backend/k8s/chaos/cpu_stress.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
apiVersion: chaos-mesh.org/v1alpha1
kind: StressChaos
metadata:
namespace: {namespace}
name: {name}
spec:
mode: all
selector:
namespaces:
- {namespace}
expressionSelectors:
- {{ key: app.kubernetes.io/instance, operator: In, values: [{instance_labels}] }}
stressors:
cpu:
workers: {num_workers}
load: {load_per_worker}
32 changes: 32 additions & 0 deletions testsuite/forge/src/backend/k8s/chaos/netem.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
apiVersion: chaos-mesh.org/v1alpha1
kind: NetworkChaos
metadata:
namespace: {namespace}
name: {name}
spec:
action: netem
mode: all
selector:
namespaces:
- {namespace}
expressionSelectors:
- {{ key: app.kubernetes.io/instance, operator: In, values: [{instance_labels}] }}
delay:
latency: "{delay_latency_ms}ms"
correlation: "{delay_correlation_percentage}"
jitter: "{delay_jitter_ms}ms"
loss:
loss: "{loss_percentage}"
correlation: "{loss_correlation_percentage}"
bandwidth:
rate: "{rate}mbps"
limit: 20971520 # placeholder value. not supported by tc netem
buffer: 10000 # placeholder value. not supported by tc netem
direction: both
target:
selector:
namespaces:
- {namespace}
expressionSelectors:
- {{ key: app.kubernetes.io/instance, operator: In, values: [{target_instance_labels}] }}
mode: all
45 changes: 45 additions & 0 deletions testsuite/forge/src/interface/chaos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ pub enum SwarmChaos {
Partition(SwarmNetworkPartition),
Bandwidth(SwarmNetworkBandwidth),
Loss(SwarmNetworkLoss),
NetEm(SwarmNetEm),
CpuStress(SwarmCpuStress),
}

#[derive(Eq, Hash, PartialEq, Debug, Clone)]
Expand Down Expand Up @@ -79,3 +81,46 @@ impl Display for SwarmNetworkLoss {
)
}
}

#[derive(Eq, Hash, PartialEq, Debug, Clone)]
pub struct SwarmNetEm {
pub group_netems: Vec<GroupNetEm>,
}

impl Display for SwarmNetEm {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
write!(f, "NetEm nodes {:?}", self.group_netems)
}
}

#[derive(Eq, Hash, PartialEq, Debug, Clone)]
pub struct GroupNetEm {
pub name: String,
pub source_nodes: Vec<PeerId>,
pub target_nodes: Vec<PeerId>,
pub delay_latency_ms: u64,
pub delay_jitter_ms: u64,
pub delay_correlation_percentage: u64,
pub loss_percentage: u64,
pub loss_correlation_percentage: u64,
pub rate_in_mbps: u64,
}

#[derive(Eq, Hash, PartialEq, Debug, Clone)]
pub struct SwarmCpuStress {
pub group_cpu_stresses: Vec<GroupCpuStress>,
}

impl Display for SwarmCpuStress {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
write!(f, "CpuStress nodes {:?}", self.group_cpu_stresses)
}
}

#[derive(Eq, Hash, PartialEq, Debug, Clone)]
pub struct GroupCpuStress {
pub name: String,
pub target_nodes: Vec<PeerId>,
pub num_workers: u64,
pub load_per_worker: u64,
}
2 changes: 1 addition & 1 deletion testsuite/testcases/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub mod framework_upgrade;
pub mod fullnode_reboot_stress_test;
pub mod load_vs_perf_benchmark;
pub mod modifiers;
pub mod multi_region_simulation_test;
pub mod multi_region_network_test;
pub mod network_bandwidth_test;
pub mod network_loss_test;
pub mod network_partition_test;
Expand Down
84 changes: 82 additions & 2 deletions testsuite/testcases/src/modifiers.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::{LoadDestination, NetworkLoadTest};
use aptos_forge::{NetworkContext, NetworkTest, Swarm, SwarmExt, Test};
use crate::{multi_region_network_test::chunk_validators, LoadDestination, NetworkLoadTest};
use aptos_forge::{
GroupCpuStress, NetworkContext, NetworkTest, Swarm, SwarmChaos, SwarmCpuStress, SwarmExt, Test,
};
use aptos_logger::info;
use aptos_types::PeerId;
use rand::Rng;
use tokio::runtime::Runtime;

Expand Down Expand Up @@ -194,3 +197,80 @@ impl Test for NetworkUnreliabilityTest {
"NetworkUnreliabilityWrapper"
}
}

#[derive(Clone)]
pub struct CpuChaosConfig {
pub num_groups: usize,
pub load_per_worker: u64,
}

impl Default for CpuChaosConfig {
fn default() -> Self {
Self {
num_groups: 4,
load_per_worker: 100,
}
}
}

pub struct CpuChaosTest {
pub override_config: Option<CpuChaosConfig>,
}

impl Test for CpuChaosTest {
fn name(&self) -> &'static str {
"CpuChaosWrapper"
}
}

fn create_cpu_stress_template(
all_validators: Vec<PeerId>,
config: &CpuChaosConfig,
) -> SwarmCpuStress {
let validator_chunks = chunk_validators(all_validators, config.num_groups);

let group_cpu_stresses = validator_chunks
.into_iter()
.enumerate()
.map(|(idx, chunk)| GroupCpuStress {
name: format!("group-{}-cpu-stress", idx),
target_nodes: chunk,
num_workers: (config.num_groups - idx) as u64,
load_per_worker: config.load_per_worker,
})
.collect();
SwarmCpuStress { group_cpu_stresses }
}

impl NetworkLoadTest for CpuChaosTest {
fn setup(&self, ctx: &mut NetworkContext) -> anyhow::Result<LoadDestination> {
let all_validators = ctx
.swarm()
.validators()
.map(|v| v.peer_id())
.collect::<Vec<_>>();

let config = self.override_config.as_ref().cloned().unwrap_or_default();

let swarm_cpu_stress = create_cpu_stress_template(all_validators, &config);
ctx.swarm()
.inject_chaos(SwarmChaos::CpuStress(swarm_cpu_stress))?;

Ok(LoadDestination::FullnodesOtherwiseValidators)
}

fn finish(&self, swarm: &mut dyn Swarm) -> anyhow::Result<()> {
let all_validators = swarm.validators().map(|v| v.peer_id()).collect::<Vec<_>>();

let config = self.override_config.as_ref().cloned().unwrap_or_default();

let swarm_cpu_stress = create_cpu_stress_template(all_validators, &config);
swarm.remove_chaos(SwarmChaos::CpuStress(swarm_cpu_stress))
}
}

impl NetworkTest for CpuChaosTest {
fn run<'t>(&self, ctx: &mut NetworkContext<'t>) -> anyhow::Result<()> {
<dyn NetworkLoadTest>::run(self, ctx)
}
}
Loading

0 comments on commit fa618bc

Please sign in to comment.