diff --git a/docs/webhook_events.md b/docs/webhook_events.md index 53964ed3b9..f208e72b34 100644 --- a/docs/webhook_events.md +++ b/docs/webhook_events.md @@ -399,6 +399,10 @@ Each event will be submitted via HTTP POST to the user provided URL. "title": "Generator Options", "type": "array" }, + "input_file": { + "title": "Input File", + "type": "string" + }, "minimized_stack_depth": { "title": "Minimized Stack Depth", "type": "integer" @@ -528,7 +532,8 @@ Each event will be submitted via HTTP POST to the user provided URL. "generic_merge", "generic_generator", "generic_crash_report", - "generic_regression" + "generic_regression", + "analysis_single" ], "title": "TaskType" }, @@ -1073,7 +1078,8 @@ Each event will be submitted via HTTP POST to the user provided URL. "generic_merge", "generic_generator", "generic_crash_report", - "generic_regression" + "generic_regression", + "analysis_single" ], "title": "TaskType" }, @@ -2143,6 +2149,10 @@ Each event will be submitted via HTTP POST to the user provided URL. "title": "Generator Options", "type": "array" }, + "input_file": { + "title": "Input File", + "type": "string" + }, "minimized_stack_depth": { "title": "Minimized Stack Depth", "type": "integer" @@ -2272,7 +2282,8 @@ Each event will be submitted via HTTP POST to the user provided URL. "generic_merge", "generic_generator", "generic_crash_report", - "generic_regression" + "generic_regression", + "analysis_single" ], "title": "TaskType" }, @@ -2855,6 +2866,10 @@ Each event will be submitted via HTTP POST to the user provided URL. "title": "Generator Options", "type": "array" }, + "input_file": { + "title": "Input File", + "type": "string" + }, "minimized_stack_depth": { "title": "Minimized Stack Depth", "type": "integer" @@ -2984,7 +2999,8 @@ Each event will be submitted via HTTP POST to the user provided URL. "generic_merge", "generic_generator", "generic_crash_report", - "generic_regression" + "generic_regression", + "analysis_single" ], "title": "TaskType" }, @@ -3358,6 +3374,10 @@ Each event will be submitted via HTTP POST to the user provided URL. "title": "Generator Options", "type": "array" }, + "input_file": { + "title": "Input File", + "type": "string" + }, "minimized_stack_depth": { "title": "Minimized Stack Depth", "type": "integer" @@ -3487,7 +3507,8 @@ Each event will be submitted via HTTP POST to the user provided URL. "generic_merge", "generic_generator", "generic_crash_report", - "generic_regression" + "generic_regression", + "analysis_single" ], "title": "TaskType" }, @@ -3806,6 +3827,10 @@ Each event will be submitted via HTTP POST to the user provided URL. "title": "Generator Options", "type": "array" }, + "input_file": { + "title": "Input File", + "type": "string" + }, "minimized_stack_depth": { "title": "Minimized Stack Depth", "type": "integer" @@ -3935,7 +3960,8 @@ Each event will be submitted via HTTP POST to the user provided URL. "generic_merge", "generic_generator", "generic_crash_report", - "generic_regression" + "generic_regression", + "analysis_single" ], "title": "TaskType" }, @@ -4228,6 +4254,10 @@ Each event will be submitted via HTTP POST to the user provided URL. "title": "Generator Options", "type": "array" }, + "input_file": { + "title": "Input File", + "type": "string" + }, "minimized_stack_depth": { "title": "Minimized Stack Depth", "type": "integer" @@ -4371,7 +4401,8 @@ Each event will be submitted via HTTP POST to the user provided URL. "generic_merge", "generic_generator", "generic_crash_report", - "generic_regression" + "generic_regression", + "analysis_single" ], "title": "TaskType" }, @@ -4677,6 +4708,10 @@ Each event will be submitted via HTTP POST to the user provided URL. "title": "Generator Options", "type": "array" }, + "input_file": { + "title": "Input File", + "type": "string" + }, "minimized_stack_depth": { "title": "Minimized Stack Depth", "type": "integer" @@ -4806,7 +4841,8 @@ Each event will be submitted via HTTP POST to the user provided URL. "generic_merge", "generic_generator", "generic_crash_report", - "generic_regression" + "generic_regression", + "analysis_single" ], "title": "TaskType" }, @@ -6299,6 +6335,10 @@ Each event will be submitted via HTTP POST to the user provided URL. "title": "Generator Options", "type": "array" }, + "input_file": { + "title": "Input File", + "type": "string" + }, "minimized_stack_depth": { "title": "Minimized Stack Depth", "type": "integer" @@ -6442,7 +6482,8 @@ Each event will be submitted via HTTP POST to the user provided URL. "generic_merge", "generic_generator", "generic_crash_report", - "generic_regression" + "generic_regression", + "analysis_single" ], "title": "TaskType" }, diff --git a/src/agent/onefuzz-agent/src/tasks/analysis/mod.rs b/src/agent/onefuzz-agent/src/tasks/analysis/mod.rs index 87af96aae5..209f96a8d5 100644 --- a/src/agent/onefuzz-agent/src/tasks/analysis/mod.rs +++ b/src/agent/onefuzz-agent/src/tasks/analysis/mod.rs @@ -2,3 +2,4 @@ // Licensed under the MIT License. pub mod generic; +pub mod single; diff --git a/src/agent/onefuzz-agent/src/tasks/analysis/single.rs b/src/agent/onefuzz-agent/src/tasks/analysis/single.rs new file mode 100644 index 0000000000..9ed562089e --- /dev/null +++ b/src/agent/onefuzz-agent/src/tasks/analysis/single.rs @@ -0,0 +1,124 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +use crate::tasks::{config::CommonConfig, heartbeat::HeartbeatSender}; +use anyhow::{Context, Result}; +use onefuzz::{expand::Expand, fs::set_executable, process::monitor_process, syncdir::SyncedDir}; +use serde::Deserialize; +use std::time::Duration; +use std::{collections::HashMap, path::PathBuf, process::Stdio}; +use tokio::process::Command; + +const INITIAL_DELAY: Duration = Duration::from_millis(1); + +#[derive(Debug, Deserialize)] +pub struct Config { + pub analyzer_exe: String, + pub analyzer_options: Vec, + pub analyzer_env: HashMap, + + pub target_exe: PathBuf, + pub target_options: Vec, + pub crashes: SyncedDir, + pub input_file: String, + + pub tools: Option, + + #[serde(flatten)] + pub common: CommonConfig, +} + +pub async fn run(config: Config) -> Result<()> { + config + .crashes + .init_pull() + .await + .context("unable to sync crashes")?; + if let Some(tools) = &config.tools { + tools.init_pull().await.context("unable to sync tools")?; + set_executable(&tools.local_path) + .await + .context("unable to set tools as executable")?; + } + + run_tool(&config).await +} + +pub async fn run_tool(config: &Config) -> Result<()> { + let heartbeat = config.common.init_heartbeat(Some(INITIAL_DELAY)).await?; + let expand = Expand::new() + .target_exe(&config.target_exe) + .target_options(&config.target_options) + .analyzer_exe(&config.analyzer_exe) + .analyzer_options(&config.analyzer_options) + .crashes(&config.crashes.local_path) + .set_optional_ref(&config.tools, |tester, key| { + tester.tools_dir(&key.local_path) + }) + .setup_dir(&config.common.setup_dir) + .job_id(&config.common.job_id) + .task_id(&config.common.task_id) + .set_optional_ref(&config.common.microsoft_telemetry_key, |tester, key| { + tester.microsoft_telemetry_key(&key) + }) + .set_optional_ref(&config.common.instance_telemetry_key, |tester, key| { + tester.instance_telemetry_key(&key) + }) + .set_optional_ref( + &config.crashes.remote_path.clone().and_then(|u| u.account()), + |tester, account| tester.crashes_account(account), + ) + .set_optional_ref( + &config + .crashes + .remote_path + .clone() + .and_then(|u| u.container()), + |tester, container| tester.crashes_container(container), + ); + + let input_path = expand + .evaluate_value(format!("{{crashes}}/{}", config.input_file)) + .context("unable to expand input_path")?; + let expand = expand.input_path(input_path); + + let analyzer_path = expand + .evaluate_value(&config.analyzer_exe) + .context("expanding analyzer_exe failed")?; + + loop { + let mut cmd = Command::new(&analyzer_path); + cmd.kill_on_drop(true) + .env_remove("RUST_LOG") + .stdin(Stdio::null()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()); + + for arg in expand.evaluate(&config.analyzer_options)? { + cmd.arg(arg); + } + + for (k, v) in &config.analyzer_env { + cmd.env( + k, + expand + .evaluate_value(v) + .context("expanding analyzer_env failed")?, + ); + } + + info!("analyzing input with {:?}", cmd); + let output = cmd + .spawn() + .with_context(|| format!("analyzer failed to start: {}", analyzer_path))?; + + heartbeat.alive(); + + // while we monitor the runtime of the debugger, we don't fail the task if + // the debugger exits non-zero. This frequently happens during normal use of + // debuggers. + monitor_process(output, "crash-repro".to_string(), true, None) + .await + .ok(); + } +} diff --git a/src/agent/onefuzz-agent/src/tasks/config.rs b/src/agent/onefuzz-agent/src/tasks/config.rs index 5a15347a44..e418037c69 100644 --- a/src/agent/onefuzz-agent/src/tasks/config.rs +++ b/src/agent/onefuzz-agent/src/tasks/config.rs @@ -100,6 +100,9 @@ pub enum Config { #[serde(alias = "generic_regression")] GenericRegression(regression::generic::Config), + + #[serde(alias = "analysis_single")] + AnalysisSingle(analysis::single::Config), } impl Config { @@ -130,6 +133,7 @@ impl Config { Config::GenericSupervisor(c) => &mut c.common, Config::GenericGenerator(c) => &mut c.common, Config::GenericRegression(c) => &mut c.common, + Config::AnalysisSingle(c) => &mut c.common, } } @@ -149,6 +153,7 @@ impl Config { Config::GenericSupervisor(c) => &c.common, Config::GenericGenerator(c) => &c.common, Config::GenericRegression(c) => &c.common, + Config::AnalysisSingle(c) => &c.common, } } @@ -168,6 +173,7 @@ impl Config { Config::GenericSupervisor(_) => "generic_supervisor", Config::GenericGenerator(_) => "generic_generator", Config::GenericRegression(_) => "generic_regression", + Config::AnalysisSingle(_) => "analysis_single", }; match self { @@ -177,6 +183,9 @@ impl Config { Config::GenericAnalysis(c) => { event!(task_start; EventData::Type = event_type, EventData::ToolName = c.analyzer_exe.clone()); } + Config::AnalysisSingle(c) => { + event!(task_start; EventData::Type = event_type, EventData::ToolName = c.analyzer_exe.clone()); + } _ => { event!(task_start; EventData::Type = event_type); } @@ -238,6 +247,7 @@ impl Config { .run() .await } + Config::AnalysisSingle(config) => analysis::single::run(config).await, } } } diff --git a/src/api-service/__app__/onefuzzlib/tasks/config.py b/src/api-service/__app__/onefuzzlib/tasks/config.py index 8b03ceb224..cee3797ba5 100644 --- a/src/api-service/__app__/onefuzzlib/tasks/config.py +++ b/src/api-service/__app__/onefuzzlib/tasks/config.py @@ -380,6 +380,9 @@ def build_task_config( else True ) + if TaskFeature.input_file in definition.features: + config.input_file = task_config.task.input_file + if TaskFeature.coverage_filter in definition.features: coverage_filter = task_config.task.coverage_filter diff --git a/src/api-service/__app__/onefuzzlib/tasks/defs.py b/src/api-service/__app__/onefuzzlib/tasks/defs.py index 33bdd80a7a..90edfeaffe 100644 --- a/src/api-service/__app__/onefuzzlib/tasks/defs.py +++ b/src/api-service/__app__/onefuzzlib/tasks/defs.py @@ -592,4 +592,35 @@ ), ], ), + TaskType.analysis_single: TaskDefinition( + features=[ + TaskFeature.analyzer_exe, + TaskFeature.analyzer_env, + TaskFeature.analyzer_options, + TaskFeature.target_exe, + TaskFeature.target_options, + TaskFeature.input_file, + ], + vm=VmDefinition(compare=Compare.AtLeast, value=1), + containers=[ + ContainerDefinition( + type=ContainerType.setup, + compare=Compare.Equal, + value=1, + permissions=[ContainerPermission.Read, ContainerPermission.List], + ), + ContainerDefinition( + type=ContainerType.crashes, + compare=Compare.Equal, + value=1, + permissions=[ContainerPermission.Read, ContainerPermission.List], + ), + ContainerDefinition( + type=ContainerType.tools, + compare=Compare.AtMost, + value=1, + permissions=[ContainerPermission.Read, ContainerPermission.List], + ), + ], + ), } diff --git a/src/cli/onefuzz/api.py b/src/cli/onefuzz/api.py index afb219bf0d..d2e69e5042 100644 --- a/src/cli/onefuzz/api.py +++ b/src/cli/onefuzz/api.py @@ -12,7 +12,7 @@ import uuid from enum import Enum from shutil import which -from typing import Callable, Dict, List, Optional, Tuple, Type, TypeVar +from typing import Callable, Dict, List, Optional, Tuple, Type, TypeVar, Union from uuid import UUID import semver @@ -56,6 +56,7 @@ class PreviewFeature(Enum): job_templates = "job_templates" + crash_repro_task = "crash_repro_task" def is_uuid(value: str) -> bool: @@ -480,6 +481,304 @@ def reset( self.delete(container.name) +class LiveRepro(Endpoint): + """Live debug a crash using pools - PREVIEW FEATURE""" + + def create( + self, + container: primitives.Container, + path: str, + *, + pool_name: Optional[primitives.PoolName] = None, + duration: int = 1, + tags: Optional[Dict[str, str]] = None, + analyzer_env: Optional[Dict[str, str]] = {"ASAN_OPTIONS": "abort_on_error=1"}, + analyzer_exe: Optional[str] = None, + analyzer_options: Optional[List[str]] = None, + ) -> models.Task: + self.onefuzz._warn_preview(PreviewFeature.crash_repro_task) + raw_report = self.onefuzz.containers.files.get(container, path) + report = models.Report.parse_raw(raw_report) + if report.input_blob is None: + raise Exception("report does not include input_blob") + job = self.onefuzz.jobs.get(report.job_id) + task = self.onefuzz.tasks.get(report.task_id) + + if pool_name is None and task.config.pool: + pool_name = task.config.pool.pool_name + + if pool_name is None: + raise Exception("missing pool_name") + + pool = self.onefuzz.pools.get(pool_name) + if not pool.managed: + raise Exception("live repro is only supported on managed pools") + + if analyzer_exe is None: + if pool.os == enums.OS.windows: + analyzer_exe = "cdb.exe" + if analyzer_options is None: + analyzer_options = [ + "-server", + "tcp:port=1337", + "{target_exe}", + "{input}", + ] + else: + analyzer_exe = "gdbserver" + if analyzer_options is None: + analyzer_options = ["localhost:1337", "{target_exe}", "{input}"] + + if tags is None: + tags = {} + + containers = [ + x for x in task.config.containers if x.type == enums.ContainerType.setup + ] + containers.append( + models.TaskContainers( + name=report.input_blob.container, type=enums.ContainerType.crashes + ) + ) + + job = self.onefuzz.jobs.create( + job.config.project, job.config.name, job.config.build, duration=duration + ) + task_config = models.TaskConfig( + job_id=job.job_id, + pool=models.TaskPool(pool_name=pool_name, count=1), + containers=containers, + colocate=False, + tags=tags, + task=models.TaskDetails( + duration=duration, + type=enums.TaskType.analysis_single, + input_file=report.input_blob.name, + target_exe=task.config.task.target_exe, + target_options=task.config.task.target_options, + analyzer_exe=analyzer_exe, + analyzer_env=analyzer_env, + analyzer_options=analyzer_options, + ), + ) + return self.onefuzz.tasks.create_with_config(task_config) + + def _get_task_checked(self, task_id: UUID) -> models.Task: + task = self.onefuzz.tasks.get(task_id) + + if task.error: + raise Exception("task failed: %s" % task.error.json()) + + if task.state in enums.TaskState.shutting_down(): + raise Exception("task stopped") + return task + + def connect( + self, + task_id: UUID_EXPANSION, + *, + public_key_path: primitives.File = primitives.File( + os.path.expanduser("~/.ssh/id_rsa.pub") + ), + private_key_path: Optional[primitives.File] = None, + debug_command: Optional[str] = None, + stop_after_use: bool = False, + ) -> Optional[str]: + task_id_expanded: UUID = self._disambiguate_uuid( + "task_id", + task_id, + lambda: [str(x.task_id) for x in self.onefuzz.tasks.list()], + ) + + def missing_node() -> Tuple[bool, str, models.Task]: + task = self._get_task_checked(task_id_expanded) + return ( + bool(task.nodes), + "waiting task to be assigned to node", + task, + ) + + task = wait(missing_node) + if task.error: + raise Exception("task failed: %s" % task.error.json()) + + if task.config.task.type != enums.TaskType.analysis_single: + raise Exception("invalid task type") + + if not task.nodes: + raise Exception("missing nodes") + node_assignment = task.nodes[0] + if not node_assignment.scaleset_id: + raise Exception("task got assigned to unmanaged pool") + node_id = node_assignment.node_id + + proxy = self.onefuzz.scaleset_proxy.create( + node_assignment.scaleset_id, node_id, 22, duration=task.config.task.duration + ) + + with open(public_key_path, "r") as handle: + self.onefuzz.nodes.add_ssh_key(node_id, public_key=handle.read()) + + def wait_for_key() -> Tuple[bool, str, models.Node]: + self._get_task_checked(task_id_expanded) + node = self.onefuzz.nodes.get(node_id) + return (not bool(node.messages), "waiting for node to add ssh key", node) + + node = wait(wait_for_key) + pool = self.onefuzz.pools.get(node.pool_name) + + def missing_ip() -> Tuple[bool, str, responses.ProxyGetResult]: + self._get_task_checked(task_id_expanded) + if node.scaleset_id is None: + raise Exception("node got assigned to unmanaged node") + + proxy = self.onefuzz.scaleset_proxy.get( + node.scaleset_id, node.machine_id, 22 + ) + return (proxy.ip is not None, "waiting for proxy", proxy) + + proxy = wait(missing_ip) + if not proxy.ip: + raise Exception("missing ip") + + def waiting_for_heartbeat() -> Tuple[bool, str, models.Task]: + task = self._get_task_checked(task_id_expanded) + return ( + task.heartbeat is not None, + "waiting for task heartbeat", + task, + ) + + task = wait(waiting_for_heartbeat) + + cmd = {enums.OS.linux: self._dbg_linux, enums.OS.windows: self._dbg_windows} + result = cmd[pool.os]( + proxy.ip, + proxy.forward.src_port, + private_key_path=private_key_path, + debug_command=debug_command, + ) + if stop_after_use: + self.onefuzz.tasks.delete(task_id_expanded) + return result + + def create_and_connect( + self, + container: primitives.Container, + path: str, + *, + pool_name: Optional[primitives.PoolName], + public_key_path: primitives.File = primitives.File( + os.path.expanduser("~/.ssh/id_rsa.pub") + ), + private_key_path: Optional[primitives.File] = None, + duration: int = 1, + tags: Optional[Dict[str, str]] = None, + debug_command: Optional[str] = None, + analyzer_env: Optional[Dict[str, str]] = {"ASAN_OPTIONS": "abort_on_error=1"}, + analyzer_exe: Optional[str] = None, + analyzer_options: Optional[List[str]] = None, + stop_after_use: bool = False, + ) -> Optional[str]: + + task = self.create( + container, + path, + pool_name=pool_name, + duration=duration, + tags=tags, + analyzer_env=analyzer_env, + analyzer_exe=analyzer_exe, + analyzer_options=analyzer_options, + ) + self.logger.info("task created: %s", task.task_id) + return self.connect( + task.task_id, + public_key_path=public_key_path, + private_key_path=private_key_path, + debug_command=debug_command, + stop_after_use=stop_after_use, + ) + + def _dbg_linux( + self, + ip: str, + port: int, + *, + debug_command: Optional[str], + private_key_path: Optional[primitives.File], + ) -> Optional[str]: + """Launch gdb with GDB script that includes 'target remote | ssh ...'""" + + with ssh_connect( + ip, port=port, proxy=REPRO_SSH_FORWARD, private_key_path=private_key_path + ): + gdb_script = ["target remote localhost:1337"] + if debug_command: + gdb_script += [debug_command, "quit"] + + with temp_file("gdb.script", "\n".join(gdb_script)) as gdb_script_path: + dbg = ["gdb", "--silent", "--command", gdb_script_path] + + if debug_command: + dbg += ["--batch"] + + try: + # security note: dbg is built from content coming from + # the server, which is trusted in this context. + return subprocess.run( # nosec + dbg, stdout=subprocess.PIPE, stderr=subprocess.STDOUT + ).stdout.decode(errors="ignore") + except subprocess.CalledProcessError as err: + self.logger.error( + "debug failed: %s", err.output.decode(errors="ignore") + ) + raise err + else: + # security note: dbg is built from content coming from the + # server, which is trusted in this context. + subprocess.call(dbg) # nosec + return None + + def _dbg_windows( + self, + ip: str, + port: int, + *, + debug_command: Optional[str], + private_key_path: Optional[primitives.File], + ) -> Optional[str]: + """Setup an SSH tunnel, then connect via CDB over SSH tunnel""" + bind_all = which("wslpath") is not None + proxy = "*:" + REPRO_SSH_FORWARD if bind_all else REPRO_SSH_FORWARD + with ssh_connect(ip, port=port, private_key_path=private_key_path, proxy=proxy): + dbg = ["cdb.exe", "-remote", "tcp:port=1337,server=localhost"] + if debug_command: + dbg_script = [debug_command, "qq"] + with temp_file("db.script", "\r\n".join(dbg_script)) as dbg_script_path: + dbg += ["-cf", _wsl_path(dbg_script_path)] + + logging.debug("launching: %s", dbg) + try: + # security note: dbg is built from content coming from the server, + # which is trusted in this context. + return subprocess.run( # nosec + dbg, stdout=subprocess.PIPE, stderr=subprocess.STDOUT + ).stdout.decode(errors="ignore") + except subprocess.CalledProcessError as err: + self.logger.error( + "debug failed: %s", err.output.decode(errors="ignore") + ) + raise err + else: + logging.debug("launching: %s", dbg) + # security note: dbg is built from content coming from the + # server, which is trusted in this context. + subprocess.call(dbg) # nosec + + return None + + class Repro(Endpoint): """Interact with Reproduction VMs""" @@ -538,7 +837,7 @@ def _dbg_linux( raise Exception("vm setup failed: %s" % repro.state) with build_ssh_command( - repro.ip, repro.auth.private_key, command="-T" + repro.ip, private_key=repro.auth.private_key, command="-T" ) as ssh_cmd: gdb_script = [ @@ -586,7 +885,7 @@ def _dbg_windows( bind_all = which("wslpath") is not None and repro.os == enums.OS.windows proxy = "*:" + REPRO_SSH_FORWARD if bind_all else REPRO_SSH_FORWARD - with ssh_connect(repro.ip, repro.auth.private_key, proxy=proxy): + with ssh_connect(repro.ip, private_key=repro.auth.private_key, proxy=proxy): dbg = ["cdb.exe", "-remote", "tcp:port=1337,server=localhost"] if debug_command: dbg_script = [debug_command, "qq"] @@ -1551,7 +1850,7 @@ def create( machine_id_expanded, ) = self.onefuzz.scalesets._expand_scaleset_machine(scaleset_id, machine_id) - self.logger.debug( + self.logger.info( "create proxy: %s:%s %d", scaleset.scaleset_id, machine_id_expanded, @@ -1654,7 +1953,6 @@ def __init__( config=DEFAULT, config_path=config_path, token_path=token_path ) self.containers = Containers(self) - self.repro = Repro(self) self.notifications = Notifications(self) self.tasks = Tasks(self) self.jobs = Jobs(self) @@ -1670,6 +1968,10 @@ def __init__( if self._backend.is_feature_enabled(PreviewFeature.job_templates.name): self.job_templates = JobTemplates(self) + self.repro: Union[LiveRepro, Repro] = Repro(self) + if self._backend.is_feature_enabled(PreviewFeature.crash_repro_task.name): + self.repro = LiveRepro(self) + # these are externally developed cli modules self.template = Template(self, self.logger) self.debug = Debug(self, self.logger) @@ -1812,7 +2114,7 @@ def _delete_components( ) self.notifications.delete(notification.notification_id) - if repros: + if repros and isinstance(self.repro, Repro): for vm in self.repro.list(): self.repro.delete(str(vm.vm_id)) diff --git a/src/cli/onefuzz/debug.py b/src/cli/onefuzz/debug.py index c00a80b925..df53cfdce9 100644 --- a/src/cli/onefuzz/debug.py +++ b/src/cli/onefuzz/debug.py @@ -20,7 +20,7 @@ from onefuzztypes.models import BlobRef, Job, NodeAssignment, Report, Task, TaskConfig from onefuzztypes.primitives import Container, Directory, PoolName -from onefuzz.api import UUID_EXPANSION, Command, Onefuzz +from onefuzz.api import UUID_EXPANSION, Command, LiveRepro, Onefuzz from .backend import wait from .rdp import rdp_connect @@ -37,11 +37,16 @@ class DebugRepro(Command): """Debug repro instances""" def _disambiguate(self, vm_id: UUID_EXPANSION) -> str: + if isinstance(self.onefuzz.repro, LiveRepro): + raise Exception("not supported on LiveRepro") + + repro_list = self.onefuzz.repro.list + return str( self.onefuzz.repro._disambiguate_uuid( "vm_id", vm_id, - lambda: [str(x.vm_id) for x in self.onefuzz.repro.list()], + lambda: [str(x.vm_id) for x in repro_list()], ) ) @@ -50,6 +55,9 @@ def _info(self) -> Tuple[str, str]: return info.resource_group, info.subscription def ssh(self, vm_id: str) -> None: + if isinstance(self.onefuzz.repro, LiveRepro): + raise Exception("not supported on LiveRepro. Use onefuzz.debug.tasks.ssh") + vm_id = self._disambiguate(vm_id) repro = self.onefuzz.repro.get(vm_id) if repro.ip is None: @@ -57,10 +65,13 @@ def ssh(self, vm_id: str) -> None: if repro.auth is None: raise Exception("missing Auth: %s" % repro) - with ssh_connect(repro.ip, repro.auth.private_key, call=True): + with ssh_connect(repro.ip, private_key=repro.auth.private_key, call=True): pass def rdp(self, vm_id: str) -> None: + if isinstance(self.onefuzz.repro, LiveRepro): + raise Exception("not supported on LiveRepro. Use onefuzz.debug.task.rdp") + vm_id = self._disambiguate(vm_id) repro = self.onefuzz.repro.get(vm_id) if repro.ip is None: @@ -165,7 +176,11 @@ def ssh( raise Exception("auth is not available for scaleset") with ssh_connect( - ip, scaleset.auth.private_key, port=port, call=True, command=command + ip, + private_key=scaleset.auth.private_key, + port=port, + call=True, + command=command, ): return diff --git a/src/cli/onefuzz/repro.py b/src/cli/onefuzz/repro.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/cli/onefuzz/ssh.py b/src/cli/onefuzz/ssh.py index 7782926a0a..af8dfa1763 100644 --- a/src/cli/onefuzz/ssh.py +++ b/src/cli/onefuzz/ssh.py @@ -10,7 +10,7 @@ import tempfile from asyncio.subprocess import PIPE from contextlib import contextmanager -from typing import Generator, Optional +from typing import Generator, List, Optional def get_local_tmp() -> Optional[str]: @@ -52,54 +52,89 @@ def temp_file( logging.debug("cleaning up file %s", full_path) +def build_ssh_command_args( + *, + ip: str, + private_key_path: Optional[str] = None, + proxy: Optional[str] = None, + port: Optional[int] = None, + command: Optional[str] = None, +) -> List[str]: + cmd = [ + "ssh", + "onefuzz@%s" % ip, + "-o", + "UserKnownHostsFile=/dev/null", + "-o", + "StrictHostKeyChecking=no", + ] + + if private_key_path: + cmd += ["-i", private_key_path] + if proxy: + cmd += ["-L", proxy] + if port: + cmd += ["-p", str(port)] + + log_level = logging.getLogger("nsv-backend").getEffectiveLevel() + if log_level <= logging.DEBUG: + cmd += ["-v"] + + if command: + cmd += [command] + return cmd + + @contextmanager def build_ssh_command( ip: str, - private_key: str, *, + private_key_path: Optional[str] = None, + private_key: Optional[str] = None, proxy: Optional[str] = None, port: Optional[int] = None, command: Optional[str] = None, ) -> Generator: - with temp_file("id_rsa", private_key, set_owner_only=True) as ssh_key: - cmd = [ - "ssh", - "onefuzz@%s" % ip, - "-i", - ssh_key, - "-o", - "UserKnownHostsFile=/dev/null", - "-o", - "StrictHostKeyChecking=no", - ] - - if proxy: - cmd += ["-L", proxy] - if port: - cmd += ["-p", str(port)] - - log_level = logging.getLogger("nsv-backend").getEffectiveLevel() - if log_level <= logging.DEBUG: - cmd += ["-v"] - - if command: - cmd += [command] - - yield cmd + if private_key is not None and private_key_path is not None: + raise Exception("private_key and private_key_path are mutually exclusive") + + if private_key is not None: + with temp_file("id_rsa", private_key, set_owner_only=True) as private_key_path: + yield build_ssh_command_args( + ip=ip, + proxy=proxy, + port=port, + command=command, + private_key_path=private_key_path, + ) + else: + yield build_ssh_command_args( + ip=ip, + proxy=proxy, + port=port, + command=command, + private_key_path=private_key_path, + ) @contextmanager def ssh_connect( ip: str, - private_key: str, *, + private_key_path: Optional[str] = None, + private_key: Optional[str] = None, proxy: Optional[str] = None, call: bool = False, port: Optional[int] = None, command: Optional[str] = None, ) -> Generator: with build_ssh_command( - ip, private_key, proxy=proxy, port=port, command=command + ip, + private_key=private_key, + private_key_path=private_key_path, + proxy=proxy, + port=port, + command=command, ) as cmd: logging.info("launching ssh: %s", " ".join(cmd)) diff --git a/src/integration-tests/integration-test.py b/src/integration-tests/integration-test.py index 51a8a03dae..9398d29181 100755 --- a/src/integration-tests/integration-test.py +++ b/src/integration-tests/integration-test.py @@ -28,11 +28,11 @@ from uuid import UUID, uuid4 import requests -from onefuzz.api import Command, Onefuzz +from onefuzz.api import Command, LiveRepro, Onefuzz from onefuzz.backend import ContainerWrapper, wait from onefuzz.cli import execute_api -from onefuzztypes.enums import OS, ContainerType, TaskState, VmState -from onefuzztypes.models import Job, Pool, Repro, Scaleset, Task +from onefuzztypes.enums import OS, ContainerType, TaskState +from onefuzztypes.models import Job, Pool, Scaleset, Task from onefuzztypes.primitives import Container, Directory, File, PoolName, Region from pydantic import BaseModel, Field @@ -539,27 +539,34 @@ def get_job_crash_report(self, job_id: UUID) -> Optional[Tuple[Container, str]]: return (container.name, files.files[0]) return None - def launch_repro(self) -> Tuple[bool, Dict[UUID, Tuple[Job, Repro]]]: + def check_repro(self) -> bool: # launch repro for one report from all succeessful jobs has_cdb = bool(which("cdb.exe")) has_gdb = bool(which("gdb")) jobs = self.get_jobs() + success = True + commands: Dict[OS, Tuple[str, str]] = { + OS.windows: ("g\r\nr rip", r"^rip=[a-f0-9]{16}"), + OS.linux: ("info reg rip", r"^rip\s+0x[a-f0-9]+\s+0x[a-f0-9]+"), + } + + pools = {x.os: x.name for x in self.get_pools()} - result = True - repros = {} for job in jobs: if not TARGETS[job.config.name].test_repro: self.logger.info("not testing repro for %s", job.config.name) continue - if TARGETS[job.config.name].os == OS.linux and not has_gdb: + target_os = TARGETS[job.config.name].os + + if target_os == OS.linux and not has_gdb: self.logger.warning( "skipping repro for %s, missing gdb", job.config.name ) continue - if TARGETS[job.config.name].os == OS.windows and not has_cdb: + if target_os == OS.windows and not has_cdb: self.logger.warning( "skipping repro for %s, missing cdb", job.config.name ) @@ -570,90 +577,38 @@ def launch_repro(self) -> Tuple[bool, Dict[UUID, Tuple[Job, Repro]]]: self.logger.error( "target does not include crash reports: %s", job.config.name ) - result = False - else: - self.logger.info("launching repro: %s", job.config.name) - (container, path) = report - repro = self.of.repro.create(container, path, duration=1) - repros[job.job_id] = (job, repro) - - return (result, repros) - - def check_repro(self, repros: Dict[UUID, Tuple[Job, Repro]]) -> bool: - self.logger.info("checking repros") - self.success = True - - def check_repro_impl() -> Tuple[bool, str, bool]: - # check all of the launched repros - - self.cleared = False - - def clear() -> None: - if not self.cleared: - self.cleared = True - print("") + continue - commands: Dict[OS, Tuple[str, str]] = { - OS.windows: ("r rip", r"^rip=[a-f0-9]{16}"), - OS.linux: ("info reg rip", r"^rip\s+0x[a-f0-9]+\s+0x[a-f0-9]+"), - } + self.logger.info("launching repro: %s", job.config.name) + (container, path) = report + + if isinstance(self.of.repro, LiveRepro): + result = self.of.repro.create_and_connect( + container, + path, + pool_name=pools[target_os], + duration=1, + debug_command=commands[target_os][0], + stop_after_use=True, + ) + else: + result = self.of.repro.create_and_connect( + container, + path, + duration=1, + debug_command=commands[target_os][0], + delete_after_use=True, + ) - for (job, repro) in list(repros.values()): - repros[job.job_id] = (job, self.of.repro.get(repro.vm_id)) + if result is not None and re.search( + commands[target_os][1], result, re.MULTILINE + ): + self.logger.info("repro succeeded: %s", job.config.name) + else: + self.logger.error("repro failed: %s - %s", job.config.name, result) + success = False - for (job, repro) in list(repros.values()): - if repro.error: - clear() - self.logger.error( - "repro failed: %s: %s", - job.config.name, - repro.error, - ) - self.of.repro.delete(repro.vm_id) - del repros[job.job_id] - elif repro.state == VmState.running: - try: - result = self.of.repro.connect( - repro.vm_id, - delete_after_use=True, - debug_command=commands[repro.os][0], - ) - if result is not None and re.search( - commands[repro.os][1], result, re.MULTILINE - ): - clear() - self.logger.info("repro succeeded: %s", job.config.name) - else: - clear() - self.logger.error( - "repro failed: %s - %s", job.config.name, result - ) - except Exception as err: - clear() - self.logger.error("repro failed: %s - %s", job.config.name, err) - del repros[job.job_id] - elif repro.state not in [VmState.init, VmState.extensions_launch]: - self.logger.error( - "repro failed: %s - bad state: %s", job.config.name, repro.state - ) - del repros[job.job_id] - - repro_states: Dict[str, List[str]] = {} - for (job, repro) in repros.values(): - if repro.state.name not in repro_states: - repro_states[repro.state.name] = [] - repro_states[repro.state.name].append(job.config.name) - - logline = [] - for state in repro_states: - logline.append("%s:%s" % (state, ",".join(repro_states[state]))) - - msg = "waiting repro: %s" % " ".join(logline) - if len(msg) > 80: - msg = "waiting on %d repros" % len(repros) - return (not bool(repros), msg, self.success) - - return wait(check_repro_impl) + return success def get_jobs(self) -> List[Job]: jobs = self.of.jobs.list(job_state=None) @@ -707,13 +662,16 @@ def cleanup(self) -> None: ]: container_names.add(container.name) - for repro in self.of.repro.list(): - if repro.config.container in container_names: - try: - self.of.repro.delete(repro.vm_id) - except Exception as e: - self.logger.error("cleanup of repro failed: %s %s", repro.vm_id, e) - errors.append(e) + if not isinstance(self.of.repro, LiveRepro): + for repro in self.of.repro.list(): + if repro.config.container in container_names: + try: + self.of.repro.delete(repro.vm_id) + except Exception as e: + self.logger.error( + "cleanup of repro failed: %s %s", repro.vm_id, e + ) + errors.append(e) if errors: raise Exception("cleanup failed") @@ -850,9 +808,9 @@ def check_jobs( def check_repros(self, test_id: UUID, *, endpoint: Optional[str]) -> None: self.onefuzz.__setup__(endpoint=endpoint) tester = TestOnefuzz(self.onefuzz, self.logger, test_id) - launch_result, repros = tester.launch_repro() - result = tester.check_repro(repros) - if not (result and launch_result): + + result = tester.check_repro() + if not result: raise Exception("repros failed") def launch( diff --git a/src/pytypes/onefuzztypes/enums.py b/src/pytypes/onefuzztypes/enums.py index 67796bd7fa..558b733829 100644 --- a/src/pytypes/onefuzztypes/enums.py +++ b/src/pytypes/onefuzztypes/enums.py @@ -80,6 +80,7 @@ class TaskFeature(Enum): expect_crash_on_failure = "expect_crash_on_failure" report_list = "report_list" minimized_stack_depth = "minimized_stack_depth" + input_file = "input_file" coverage_filter = "coverage_filter" target_must_use_input = "target_must_use_input" @@ -161,6 +162,7 @@ class TaskType(Enum): generic_generator = "generic_generator" generic_crash_report = "generic_crash_report" generic_regression = "generic_regression" + analysis_single = "analysis_single" class VmState(Enum): diff --git a/src/pytypes/onefuzztypes/models.py b/src/pytypes/onefuzztypes/models.py index 0e72090b85..1e87699f95 100644 --- a/src/pytypes/onefuzztypes/models.py +++ b/src/pytypes/onefuzztypes/models.py @@ -161,6 +161,7 @@ class TaskDetails(BaseModel): preserve_existing_outputs: Optional[bool] report_list: Optional[List[str]] minimized_stack_depth: Optional[int] + input_file: Optional[str] coverage_filter: Optional[str] @@ -372,6 +373,7 @@ class TaskUnitConfig(BaseModel): ensemble_sync_delay: Optional[int] report_list: Optional[List[str]] minimized_stack_depth: Optional[int] + input_file: Optional[str] coverage_filter: Optional[str] # from here forwards are Container definitions. These need to be inline