Skip to content

Commit

Permalink
[consolidation] Add tooling for executing utasks in-memory. (#3050)
Browse files Browse the repository at this point in the history
This will be a good middle ground to test on before, using GCS. And it
will be more performant than using GCS.
Related: #3008
  • Loading branch information
jonathanmetzman authored May 11, 2023
1 parent 08fda73 commit 2d6d58b
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 8 deletions.
17 changes: 16 additions & 1 deletion src/clusterfuzz/_internal/bot/tasks/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,22 @@ def execute(self, task_argument, job_type, uworker_env):
input_download_url, output_download_url = preprocess_result
utasks.uworker_main(self.module, input_download_url)
utasks.tworker_postprocess(self.module, output_download_url)
logs.log('utask local: done')
logs.log('Utask local: done.')


class UTaskLocalInmemoryExecutor(BaseTask):
"""Represents an untrusted task. Executes it entirely locally and in
memory."""

def execute(self, task_argument, job_type, uworker_env):
"""Executes a utask locally in-memory."""
uworker_input = utasks.tworker_preprocess_no_io(self.module, task_argument,
job_type, uworker_env)
if uworker_input is None:
return
uworker_output = utasks.uworker_main_no_io(self.module, uworker_input)
utasks.uworker_postprocess_no_io(self.module, uworker_output)
logs.log('Utask local: done.')


class UTask(BaseTask):
Expand Down
45 changes: 38 additions & 7 deletions src/clusterfuzz/_internal/bot/tasks/utasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,47 @@
from clusterfuzz._internal.system import environment


def tworker_preprocess_no_io(utask_module, task_argument, job_type,
uworker_env):
uworker_input = utask_module.utask_preprocess(task_argument, job_type,
uworker_env)
if not uworker_input:
return None
assert 'job_type' not in uworker_input
uworker_input['job_type'] = job_type
return uworker_io.serialize_uworker_input(uworker_input)


def uworker_main_no_io(utask_module, uworker_input):
"""Exectues the main part of a utask on the uworker (locally if not using
remote executor)."""
uworker_input = uworker_io.deserialize_uworker_input(uworker_input)
# Deal with the environment.
uworker_env = uworker_input.pop('uworker_env')
set_uworker_env(uworker_env)

uworker_output = utask_module.utask_main(**uworker_input)
uworker_output.uworker_env = uworker_env
uworker_output.uworker_input = uworker_input
return uworker_io.serialize_uworker_output(uworker_output)


def uworker_postprocess_no_io(utask_module, uworker_output):
uworker_output_dict = uworker_io.deserialize_uworker_output(uworker_output)
uworker_output = uworker_io.uworker_output_from_dict(uworker_output_dict)
utask_module.utask_postprocess(uworker_output)


def tworker_preprocess(utask_module, task_argument, job_type, uworker_env):
"""Executes the preprocessing step of the utask |task_module| and returns the
"""Executes the preprocessing step of the utask |utask_module| and returns the
signed download URL for the uworker's input and the (unsigned) download URL
for its output."""
# Do preprocessing.
uworker_input = utask_module.utask_preprocess(task_argument, job_type,
uworker_env)
if not uworker_input:
# Bail if preprocessing failed since we can't proceed.
return None, None
return None

# Get URLs for the uworker's output. We need a signed upload URL so it can
# write its output. Also get a download URL in case the caller wants to read
Expand All @@ -46,13 +77,13 @@ def tworker_preprocess(utask_module, task_argument, job_type, uworker_env):
return uworker_input_download_url, uworker_output_download_gcs_url


def set_uworker_env(uworker_env):
def set_uworker_env(uworker_env: dict) -> None:
"""Sets all env vars in |uworker_env| in the actual environment."""
for key, value in uworker_env.items():
environment.set_value(key, value)


def uworker_main(task_module, input_download_url):
def uworker_main(utask_module, input_download_url) -> None:
"""Exectues the main part of a utask on the uworker (locally if not using
remote executor)."""
uworker_input = uworker_io.download_and_deserialize_uworker_input(
Expand All @@ -63,16 +94,16 @@ def uworker_main(task_module, input_download_url):
uworker_env = uworker_input.pop('uworker_env')
set_uworker_env(uworker_env)

uworker_output = task_module.utask_main(**uworker_input)
uworker_output = utask_module.utask_main(**uworker_input)
uworker_output.uworker_env = uworker_env
uworker_output.uworker_input = uworker_input
uworker_io.serialize_and_upload_uworker_output(uworker_output,
uworker_output_upload_url)


def tworker_postprocess(task_module, output_download_url):
def tworker_postprocess(utask_module, output_download_url) -> None:
"""Executes the postprocess step on the trusted (t)worker."""
uworker_output_dict = uworker_io.download_and_deserialize_uworker_output(
output_download_url)
uworker_output = uworker_io.uworker_output_from_dict(uworker_output_dict)
task_module.utask_postprocess(uworker_output)
utask_module.utask_postprocess(uworker_output)

0 comments on commit 2d6d58b

Please sign in to comment.