From c3b75cc2118697f29056bea1f90f40ce1b408fde Mon Sep 17 00:00:00 2001 From: Dongge Liu Date: Mon, 29 Jan 2024 16:23:09 +1100 Subject: [PATCH 1/3] Use multi-thread for cloud experiments and mutli-process for local ones --- run_all_experiments.py | 18 +++++++++++++++++- run_one_experiment.py | 34 ++++++++++++++++++++++++++-------- 2 files changed, 43 insertions(+), 9 deletions(-) diff --git a/run_all_experiments.py b/run_all_experiments.py index 842352d17c..38a8a8feb6 100755 --- a/run_all_experiments.py +++ b/run_all_experiments.py @@ -18,7 +18,9 @@ import logging import os import sys +import time import traceback +from concurrent.futures import ThreadPoolExecutor, as_completed from multiprocessing import Pool import run_one_experiment @@ -219,11 +221,25 @@ def main(): result = run_experiments(*config) experiment_results.append(result) _print_experiment_result(result) + elif args.cloud_experiment_name: + # Use multi-threads for cloud experiments, because each thread only needs to + # wait for cloud build results or conduct simple I/O tasks. + with ThreadPoolExecutor(max_workers=NUM_EXP) as executor: + # Using list comprehension to submit tasks with arguments + futures = [] + for config in experiment_configs: + futures.append(executor.submit(run_experiments, *config)) + time.sleep(30) + for future in as_completed(futures): + result = future.result() + _print_experiment_result(result) + experiment_results.append(result) else: + # Use multi-process for local experiments, because each process needs to + # built fuzz targets in local docker containers. with Pool(NUM_EXP) as p: for result in p.starmap(run_experiments, experiment_configs): experiment_results.append(result) - _print_experiment_result(result) _print_experiment_results(experiment_results) diff --git a/run_one_experiment.py b/run_one_experiment.py index c0e424462d..3081bb7abb 100644 --- a/run_one_experiment.py +++ b/run_one_experiment.py @@ -19,6 +19,7 @@ import os import shutil import subprocess +from concurrent.futures import ThreadPoolExecutor, as_completed from multiprocessing import pool from typing import List, Optional @@ -174,14 +175,31 @@ def check_targets( evaluator = exp_evaluator.Evaluator(builder_runner, benchmark, work_dirs) ai_target_pairs = [(ai_binary, target) for target in generated_targets] - with pool.ThreadPool(NUM_EVA) as p: - for i, target_stat in enumerate( - p.starmap(evaluator.check_target, ai_target_pairs)): - if target_stat is None: - print(f'Error evaluating target {generated_targets[i]}') - continue - - target_stats.append((i, target_stat)) + if cloud_experiment_name: + # Use multi-threads for cloud experiments, because each thread only needs to + # wait for cloud build results or conduct simple I/O tasks. + with ThreadPoolExecutor(max_workers=NUM_EVA) as executor: + future_to_index = { + executor.submit(evaluator.check_target, *pair): i + for i, pair in enumerate(ai_target_pairs) + } + for future in as_completed(future_to_index): + i = future_to_index[future] + target_stat = future.result() + if target_stat is None: + print(f'Error evaluating target {generated_targets[i]}') + continue + + target_stats.append((i, target_stat)) + else: + # Use multi-process for local experiments, because each process needs to + # built fuzz targets in local docker containers. + with pool.ThreadPool(NUM_EVA) as p: + for i, target_stat in enumerate( + p.starmap(evaluator.check_target, ai_target_pairs)): + if target_stat is None: + print(f'Error evaluating target {generated_targets[i]}') + continue if len(target_stats) > 0: return aggregate_results(target_stats, generated_targets) From 2524e837191defc6380f63813c596d91bed11e32 Mon Sep 17 00:00:00 2001 From: Dongge Liu Date: Mon, 29 Jan 2024 16:38:18 +1100 Subject: [PATCH 2/3] Justify sleep(30) --- run_all_experiments.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/run_all_experiments.py b/run_all_experiments.py index 38a8a8feb6..b53ace3cd0 100755 --- a/run_all_experiments.py +++ b/run_all_experiments.py @@ -229,6 +229,10 @@ def main(): futures = [] for config in experiment_configs: futures.append(executor.submit(run_experiments, *config)) + # Sleep to avoid having a peak CPU usage at the beginning because of + # creating too many threads. + # Approx. 30s is sufficient because these threads will soon become idle + # when waiting for cloud build results. time.sleep(30) for future in as_completed(futures): result = future.result() From 55886e01d5c5877cb16f3375492055d35ef45abc Mon Sep 17 00:00:00 2001 From: Dongge Liu Date: Tue, 30 Jan 2024 16:38:08 +1100 Subject: [PATCH 3/3] Cleanup thread/process usage --- run_all_experiments.py | 70 ++++++++++++++++++++++++++++-------------- 1 file changed, 47 insertions(+), 23 deletions(-) diff --git a/run_all_experiments.py b/run_all_experiments.py index b53ace3cd0..99ff8c8beb 100755 --- a/run_all_experiments.py +++ b/run_all_experiments.py @@ -53,7 +53,7 @@ def __init__(self, benchmark, result): self.result = result -def get_experiment_configs( +def _get_experiment_configs( args: argparse.Namespace ) -> list[tuple[benchmarklib.Benchmark, argparse.Namespace]]: """Constructs a list of experiment configs based on the |BENCHMARK_DIR| and @@ -207,43 +207,67 @@ def _print_experiment_results(results: list[Result]): f'{result.result}\n') +def _execute_benchmark_experiment( + config: tuple[benchmarklib.Benchmark, argparse.Namespace]) -> Result: + """ + Executes one experiment with one benchmark |config| and returns the result. + """ + result = run_experiments(*config) + _print_experiment_result(result) + return result + + +def parallelize_experiments_in_threads(configs): + """Executes experiments in a multi-threaded manner.""" + futures = [] + with ThreadPoolExecutor(max_workers=NUM_EXP) as executor: + for config in configs: + futures.append(executor.submit(_execute_benchmark_experiment, config)) + # Stagger the thread creation. + # Avoid having a peak CPU usage at the beginning because of creating + # too many threads. + # Approx. 30s is sufficient because these threads will soon become idle + # when waiting for cloud build results. + time.sleep(30) + return [future.result() for future in as_completed(futures)] + + +def parallelize_experiments_in_processes(configs): + """Executes experiments in a multi-process manner.""" + results = [] + with Pool(NUM_EXP) as pool: + for config in configs: + results.append(pool.apply_async(_execute_benchmark_experiment, config)) + # Stagger the process creation. + # Avoid having a peak CPU usage at the beginning because of creating + # too many processes. + # Approx. 30s is sufficient because these threads will soon become idle + # when waiting for cloud build results. + time.sleep(30) + return [result.get() for result in results] + + def main(): logging.basicConfig(level=logging.INFO) args = parse_args() run_one_experiment.prepare() - experiment_configs = get_experiment_configs(args) + experiment_configs = _get_experiment_configs(args) experiment_results = [] print(f'Running {NUM_EXP} experiment(s) in parallel.') if NUM_EXP == 1: - for config in experiment_configs: - result = run_experiments(*config) - experiment_results.append(result) - _print_experiment_result(result) + experiment_results = [ + _execute_benchmark_experiment(config) for config in experiment_configs + ] elif args.cloud_experiment_name: # Use multi-threads for cloud experiments, because each thread only needs to # wait for cloud build results or conduct simple I/O tasks. - with ThreadPoolExecutor(max_workers=NUM_EXP) as executor: - # Using list comprehension to submit tasks with arguments - futures = [] - for config in experiment_configs: - futures.append(executor.submit(run_experiments, *config)) - # Sleep to avoid having a peak CPU usage at the beginning because of - # creating too many threads. - # Approx. 30s is sufficient because these threads will soon become idle - # when waiting for cloud build results. - time.sleep(30) - for future in as_completed(futures): - result = future.result() - _print_experiment_result(result) - experiment_results.append(result) + parallelize_experiments_in_threads(experiment_configs) else: # Use multi-process for local experiments, because each process needs to # built fuzz targets in local docker containers. - with Pool(NUM_EXP) as p: - for result in p.starmap(run_experiments, experiment_configs): - experiment_results.append(result) + parallelize_experiments_in_processes(experiment_configs) _print_experiment_results(experiment_results)