Skip to content

Commit

Permalink
Do partial migration of corpora to zipcorpora. (#3196)
Browse files Browse the repository at this point in the history
This is necessary to convert corpus_pruning and fuzz_task to untrusted.
related: #3008
  • Loading branch information
jonathanmetzman authored Jul 10, 2023
1 parent 639485d commit 10b09a7
Show file tree
Hide file tree
Showing 6 changed files with 215 additions and 48 deletions.
2 changes: 1 addition & 1 deletion src/clusterfuzz/_internal/base/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def get_delay(num_try, delay, backoff):
delay = delay * (backoff**(num_try - 1))
if _should_ignore_delay_for_testing():
# Don't sleep for long during tests. Flake is better.
return min(delay, 2)
return max(delay, 3)

return delay

Expand Down
1 change: 0 additions & 1 deletion src/clusterfuzz/_internal/bot/fuzzers/libFuzzer/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,6 @@ def minimize_corpus(self, target_path, arguments, input_dirs, output_dir,
merge_output = result.output
merge_stats = stats.parse_stats_from_merge_log(merge_output.splitlines())

# TODO(ochang): Get crashes found during merge.
return engine.FuzzResult(merge_output, result.command, [], merge_stats,
result.time_executed)

Expand Down
173 changes: 154 additions & 19 deletions src/clusterfuzz/_internal/fuzzing/corpus_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,18 @@
# limitations under the License.
"""Functions for corpus synchronization with GCS."""

import contextlib
import datetime
import os
import re
import shutil
import uuid
import zipfile

from clusterfuzz._internal.base import utils
from clusterfuzz._internal.google_cloud_utils import storage
from clusterfuzz._internal.metrics import logs
from clusterfuzz._internal.system import archive
from clusterfuzz._internal.system import environment
from clusterfuzz._internal.system import shell

Expand All @@ -40,6 +45,10 @@
PUBLIC_BACKUP_TIMESTAMP = 'public'
REGRESSIONS_GCS_PATH_SUFFIX = '_regressions'

ZIPPED_PATH_PREFIX = 'zipped'
PARTIAL_ZIPCORPUS_PREFIX = 'partial'
BASE_ZIPCORPUS_PREFIX = 'base'

RSYNC_ERROR_REGEX = (br'CommandException:\s*(\d+)\s*files?/objects? '
br'could not be copied/removed')

Expand Down Expand Up @@ -140,8 +149,8 @@ def gcs_url_for_backup_directory(backup_bucket_name, fuzzer_name,
Returns:
A string giving the GCS URL.
"""
return 'gs://%s/corpus/%s/%s/' % (backup_bucket_name, fuzzer_name,
project_qualified_target_name)
return (f'gs://{backup_bucket_name}/corpus/{fuzzer_name}/'
f'{project_qualified_target_name}/')


def gcs_url_for_backup_file(backup_bucket_name, fuzzer_name,
Expand All @@ -154,16 +163,12 @@ def gcs_url_for_backup_file(backup_bucket_name, fuzzer_name,
backup_dir = gcs_url_for_backup_directory(backup_bucket_name, fuzzer_name,
project_qualified_target_name)
backup_file = str(date) + os.extsep + BACKUP_ARCHIVE_FORMAT
return '%s/%s' % (backup_dir.rstrip('/'), backup_file)
return f'{backup_dir.rstrip("/")}/{backup_file}'


def legalize_filenames(file_paths):
"""Convert the name of every file in |file_paths| a name that is legal on
Windows. Returns list of legally named files."""
# TODO(metzman): Support legalizing filenames when called on trusted host, but
# file paths exist on untrusted workers. This is fine for now since Linux is
# the only supported platform on OSS-Fuzz and this functionality is not needed
# in OSS-Fuzz.
if environment.is_trusted_host():
return file_paths

Expand Down Expand Up @@ -202,9 +207,10 @@ def legalize_corpus_files(directory):
legalize_filenames(files_list)


class GcsCorpus(object):
class GcsCorpus:
"""Google Cloud Storage corpus."""

# TODO(metzman): Consider merging this with FuzzTargetCorpus.
def __init__(self,
bucket_name,
bucket_path='/',
Expand Down Expand Up @@ -234,7 +240,9 @@ def get_gcs_url(self, suffix=''):
Returns:
A string giving the GCS URL.
"""
url = 'gs://%s' % self.bucket_name + self.bucket_path + suffix
# TODO(metzman): Delete this after we are done migrating to the zipcorpus
# format.
url = f'gs://{self.bucket_name}{self.bucket_path}{suffix}'
if not url.endswith('/'):
# Ensure that the bucket path is '/' terminated. Without this, when a
# single file is being uploaded, it is renamed to the trailing non-/
Expand All @@ -243,6 +251,20 @@ def get_gcs_url(self, suffix=''):

return url

def get_zipcorpus_gcs_dir_url(self):
"""Build zipcorpus GCS URL for gsutil.
Returns:
A string giving the GCS URL.
"""
url = storage.get_cloud_storage_file_path(
self.bucket_name, f'{ZIPPED_PATH_PREFIX}{self.bucket_path}')
if not url.endswith('/'):
# Ensure that the bucket path is '/' terminated. Without this, when a
# single file is being uploaded, it is renamed to the trailing non-/
# terminated directory name instead.
url += '/'
return url

def rsync_from_disk(self,
directory,
timeout=CORPUS_FILES_SYNC_TIMEOUT,
Expand All @@ -259,12 +281,57 @@ def rsync_from_disk(self,
"""
corpus_gcs_url = self.get_gcs_url()
legalize_corpus_files(directory)
result = self._gsutil_runner.rsync(directory, corpus_gcs_url, timeout,
delete)
result = self._gsutil_runner.rsync(
directory, corpus_gcs_url, timeout, delete=delete)

# Upload zipcorpus.
# TODO(metzman): Get rid of the rest of this function when migration is
# complete.
filenames = shell.get_files_list(directory)
self._upload_to_zipcorpus(filenames, partial=False)

# Allow a small number of files to fail to be synced.
return _handle_rsync_result(result, max_errors=MAX_SYNC_ERRORS)

def get_zipcorpora_gcs_urls(self, max_partial_corpora=float('inf')):
"""Generates a sequence of GCS paths containing the base corpus and at most
|max_partial_corpora| (all of them by default) of the most recent partial
corpora. Note that this function can return a non-existent base zipcorpus,
so callers must ensure the zipcorpus exists before copying it."""
yield self.get_zipcorpus_gcs_url(partial=False)
partial_corpora_gcs_url = (
f'{self.get_zipcorpus_gcs_dir_url()}/{PARTIAL_ZIPCORPUS_PREFIX}*')
partial_corpora = reversed(
list(storage.list_blobs(partial_corpora_gcs_url)))
for idx, partial_corpus in enumerate(partial_corpora):
if idx > max_partial_corpora:
break
yield partial_corpus

def download_zipcorpora(self, dst_dir):
"""Downloads zipcorpora, unzips their contents, and stores them in
|dst_dir|"""
for zipcorpus_url in self.get_zipcorpora_gcs_urls():
# TODO(metzman): Find out what's the tradeoff between writing the file to
# disk first or unpacking it in-memory.
with get_temp_zip_filename() as temp_zip_filename:
if not storage.exists(zipcorpus_url):
# This is expected to happen in two scenarios:
# 1. When a fuzzer is new, get_zipcorpora_gcs_urls() will always
# return the base zipcorpus even if it doesn't exist.
# 2. When this function is executed concurrently with a corpus prune,
# the intermediate corpus may be deleted.
if zipcorpus_url.endswith(f'{BASE_ZIPCORPUS_PREFIX}.zip'):
logs.log_warn(f'Base zipcorpus {zipcorpus_url} does not exist.')
else:
logs.log_error(
f'Zipcorpus {zipcorpus_url} was expected to exist but does not.'
)
continue
if not storage.copy_file_from(zipcorpus_url, temp_zip_filename):
continue
archive.unpack(temp_zip_filename, dst_dir)

def rsync_to_disk(self,
directory,
timeout=CORPUS_FILES_SYNC_TIMEOUT,
Expand All @@ -285,6 +352,11 @@ def rsync_to_disk(self,
result = self._gsutil_runner.rsync(corpus_gcs_url, directory, timeout,
delete)

# Download zipcorpora.
# TODO(metzman): Get rid of the rest of this function when migration is
# complete.
self.download_zipcorpora(directory)

# Allow a small number of files to fail to be synced.
return _handle_rsync_result(result, max_errors=MAX_SYNC_ERRORS)

Expand All @@ -297,16 +369,77 @@ def upload_files(self, file_paths, timeout=CORPUS_FILES_SYNC_TIMEOUT):
Returns:
A bool indicating whether or not the command succeeded.
"""
# TODO(metzman): Merge this with rsync from disk when migration is complete.
if not file_paths:
return True

# Get a new file_paths iterator where all files have been renamed to be
# legal on Windows.
file_paths = legalize_filenames(file_paths)
gcs_url = self.get_gcs_url()
return self._gsutil_runner.upload_files_to_url(
result = self._gsutil_runner.upload_files_to_url(
file_paths, gcs_url, timeout=timeout)

# Upload zipcorpus.
# TODO(metzman): Get rid of the rest of this function when migration is
# complete.
self._upload_to_zipcorpus(file_paths, partial=True)
return result

def get_zipcorpus_gcs_url(self, partial):
"""Returns a zipcorpus URL for a partial zipcorpus if |partial|, or for a
base corpus if not."""
zipcorpus_url = self.get_zipcorpus_gcs_dir_url()
if partial:
timestamp = datetime.datetime.now().strftime('%Y-%m-%d-%H%M%S')
prefix = f'{PARTIAL_ZIPCORPUS_PREFIX}-{timestamp}'
else:
prefix = BASE_ZIPCORPUS_PREFIX

filename = f'{prefix}.zip'
return zipcorpus_url + filename

def _upload_to_zipcorpus(self, file_paths, partial):
"""Uploads |file_paths| to the zipcorpus on GCS. Uploads them as part of a
partial corpus if |partial| is True."""
gcs_url = self.get_zipcorpus_gcs_url(partial)
with temp_zipfile(file_paths) as archive_path:
storage.copy_file_to(archive_path, gcs_url)


@contextlib.contextmanager
def temp_zipfile(file_paths):
"""Yields a temporary zip file containing |file_paths|."""
file_paths = list(file_paths)
seen_filenames = set()

# Because of the way this function is used we will probably only ever get
# file_paths from one directory. But if we were to get files from different
# directories, it's possible there are name collisions. So do a low effort, no
# cost attempt to avoid this problem that preserves the original name for
# humans readers.
def get_unique_name(filename):
if filename not in seen_filenames:
seen_filenames.add(filename)
return filename
new_filename = f'{filename}-{str(uuid.uuid4()).lower()}'
# Assume no collisions
# https://stackoverflow.com/questions/24876188/how-big-is-the-chance-to-get-a-java-uuid-randomuuid-collision
seen_filenames.add(new_filename)
return new_filename

with get_temp_zip_filename() as zip_filename:
with zipfile.ZipFile(zip_filename, 'w') as zip_file:
for file_path in file_paths:
# Don't use the leading paths.
name = os.path.basename(file_path)
name = get_unique_name(name)
zip_file.write(file_path, arcname=name)

# Make sure to yield after the zip file is closed otherwise the user will
# get an incomplete zip file.
yield zip_filename


class FuzzTargetCorpus(GcsCorpus):
"""Engine fuzzer (libFuzzer, AFL) specific corpus."""
Expand Down Expand Up @@ -346,15 +479,15 @@ def __init__(self,
GcsCorpus.__init__(
self,
sync_corpus_bucket_name,
'/%s/%s' % (self._engine, self._project_qualified_target_name),
f'/{self._engine}/{self._project_qualified_target_name}',
log_results=log_results,
gsutil_runner_func=gsutil_runner_func,
)

self._regressions_corpus = GcsCorpus(
sync_corpus_bucket_name,
'/%s/%s%s' % (self._engine, self._project_qualified_target_name,
REGRESSIONS_GCS_PATH_SUFFIX),
f'/{self._engine}/{self._project_qualified_target_name}'
f'{REGRESSIONS_GCS_PATH_SUFFIX}',
log_results=log_results,
gsutil_runner_func=gsutil_runner_func) if include_regressions else None

Expand Down Expand Up @@ -382,8 +515,7 @@ def rsync_from_disk(self,
Returns:
A bool indicating whether or not the command succeeded.
"""
result = GcsCorpus.rsync_from_disk(
self, directory, timeout=timeout, delete=delete)
result = super().rsync_from_disk(directory, timeout=timeout, delete=delete)

num_files = _count_corpus_files(directory)
if self._log_results:
Expand All @@ -408,8 +540,7 @@ def rsync_to_disk(self,
Returns:
A bool indicating whether or not the command succeeded.
"""
result = GcsCorpus.rsync_to_disk(
self, directory, timeout=timeout, delete=delete)
result = super().rsync_to_disk(directory, timeout=timeout, delete=delete)
if not result:
return False

Expand All @@ -429,3 +560,7 @@ def rsync_to_disk(self,
def get_regressions_corpus_gcs_url(self):
"""Return gcs path to directory containing crash regressions."""
return self.get_gcs_url(suffix=REGRESSIONS_GCS_PATH_SUFFIX)


def get_temp_zip_filename():
return shell.get_tempfile(suffix='.zip')
19 changes: 19 additions & 0 deletions src/clusterfuzz/_internal/system/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@
# limitations under the License.
"""Shell related functions."""

import contextlib
import os
import re
import shlex
import shutil
import subprocess
import sys
import tempfile
import uuid

from clusterfuzz._internal.base import persistent_cache
from clusterfuzz._internal.metrics import logs
Expand Down Expand Up @@ -435,6 +437,23 @@ def remove_file(file_path):
pass


def _get_random_filename():
return str(uuid.uuid4()).lower()


@contextlib.contextmanager
def get_tempfile(prefix='', suffix=''):
"""Returns path to a temporary file."""
tempdir = environment.get_value('BOT_TMPDIR', '/tmp')
os.makedirs(tempdir, exist_ok=True)
basename = _get_random_filename()
filename = f'{prefix}{basename}{suffix}'
filepath = os.path.join(tempdir, filename)
yield filepath
if os.path.exists(filepath):
os.remove(filepath)


def remove_directory(directory, recreate=False, ignore_errors=False):
"""Removes a directory tree."""
# Log errors as warnings if |ignore_errors| is set.
Expand Down
Loading

0 comments on commit 10b09a7

Please sign in to comment.