Skip to content

Commit

Permalink
[rearchitecture] Seperate input and output buckets (#3195)
Browse files Browse the repository at this point in the history
So that we can use Pub/sub messages when outputs are written.
Related: #3008
  • Loading branch information
jonathanmetzman authored Jul 10, 2023
1 parent 10b09a7 commit eebb87e
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 21 deletions.
16 changes: 13 additions & 3 deletions src/clusterfuzz/_internal/bot/tasks/utasks/uworker_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def generate_new_input_file_name():
def get_uworker_input_gcs_path():
"""Returns a GCS path for uworker I/O."""
# Inspired by blobs.write_blob.
io_bucket = storage.uworker_io_bucket()
io_bucket = storage.uworker_input_bucket()
io_file_name = generate_new_input_file_name()
if storage.get(storage.get_cloud_storage_file_path(io_bucket, io_file_name)):
raise RuntimeError(f'UUID collision found: {io_file_name}.')
Expand All @@ -45,11 +45,21 @@ def get_uworker_output_urls(input_gcs_path):
"""Returns a signed download URL for the uworker to upload the output and a
GCS url for the tworker to download the output. Make sure we can infer the
actual input since the output is not trusted."""
gcs_path = input_gcs_path + '.output'
gcs_path = uworker_input_path_to_output_path(input_gcs_path)
# Note that the signed upload URL can't be directly downloaded from.
return storage.get_signed_upload_url(gcs_path), gcs_path


def uworker_input_path_to_output_path(input_gcs_path):
return input_gcs_path.replace(storage.uworker_input_bucket(),
storage.uworker_output_bucket())


def uworker_output_path_to_input_path(output_gcs_path):
return output_gcs_path.replace(storage.uworker_output_bucket(),
storage.uworker_input_bucket())


def get_uworker_input_urls():
"""Returns a signed download URL for the uworker to download the input and a
GCS url for the tworker to upload it (this happens first)."""
Expand Down Expand Up @@ -169,7 +179,7 @@ def serialize_and_upload_uworker_output(uworker_output, upload_url):

def download_input_based_on_output_url(output_url):
# Get the portion that does not contain ".output".
input_url = output_url.split('.output')[0]
input_url = uworker_output_path_to_input_path(output_url)
serialized_uworker_input = storage.read_data(input_url)
return deserialize_uworker_input(serialized_uworker_input)

Expand Down
27 changes: 21 additions & 6 deletions src/clusterfuzz/_internal/google_cloud_utils/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -1024,17 +1024,32 @@ def blobs_bucket():
return local_config.ProjectConfig().get('blobs.bucket')


def uworker_io_bucket():
def uworker_input_bucket():
"""Returns the bucket where uworker input is done."""
test_uworker_input_bucket = environment.get_value('TEST_UWORKER_INPUT_BUCKET')
if test_uworker_input_bucket:
return test_uworker_input_bucket

assert not environment.get_value('PY_UNITTESTS')
# TODO(metzman): Use local config.
bucket = environment.get_value('UWORKER_INPUT_BUCKET')
if not bucket:
logs.log_error('UWORKER_INPUT_BUCKET is not defined.')
return bucket


def uworker_output_bucket():
"""Returns the bucket where uworker I/O is done."""
test_uworker_io_bucket = environment.get_value('TEST_UWORKER_IO_BUCKET')
if test_uworker_io_bucket:
return test_uworker_io_bucket
test_uworker_output_bucket = environment.get_value(
'TEST_UWORKER_OUTPUT_BUCKET')
if test_uworker_output_bucket:
return test_uworker_output_bucket

assert not environment.get_value('PY_UNITTESTS')
# TODO(metzman): Use local config.
bucket = environment.get_value('UWORKER_IO_BUCKET')
bucket = environment.get_value('UWORKER_OUTPUT_BUCKET')
if not bucket:
logs.log_error('UWORKER_IO_BUCKET is not defined.')
logs.log_error('UWORKER_OUTPUT_BUCKET is not defined.')
return bucket


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,16 @@ class TestGetUrls(unittest.TestCase):
"""Tests that functions for getting urls for uploading and downloading input
and output work properly."""
FAKE_URL = 'https://fake'
WORKER_IO_BUCKET = 'UWORKER_IO'
WORKER_INPUT_BUCKET = 'UWORKER_INPUT'
WORKER_OUTPUT_BUCKET = 'UWORKER_OUTPUT'
NEW_IO_FILE_NAME = 'new-filename'
EXPECTED_GCS_PATH = '/UWORKER_IO/new-filename'
EXPECTED_INPUT_GCS_PATH = '/UWORKER_INPUT/new-filename'
EXPECTED_OUTPUT_GCS_PATH = '/UWORKER_OUTPUT/new-filename'

def setUp(self):
helpers.patch_environ(self)
os.environ['TEST_UWORKER_IO_BUCKET'] = self.WORKER_IO_BUCKET
os.environ['TEST_UWORKER_INPUT_BUCKET'] = self.WORKER_INPUT_BUCKET
os.environ['TEST_UWORKER_OUTPUT_BUCKET'] = self.WORKER_OUTPUT_BUCKET
helpers.patch(self, [
'clusterfuzz._internal.google_cloud_utils.storage.get',
'clusterfuzz._internal.google_cloud_utils.storage._sign_url',
Expand All @@ -118,20 +121,21 @@ def setUp(self):

def test_get_uworker_output_urls(self):
"""Tests that get_uworker_output_urls works."""
expected_upload_url = self.EXPECTED_GCS_PATH + '.output'
expected_urls = (self.FAKE_URL, expected_upload_url)
expected_urls = (self.FAKE_URL, self.EXPECTED_OUTPUT_GCS_PATH)
self.assertEqual(
uworker_io.get_uworker_output_urls(self.EXPECTED_GCS_PATH),
uworker_io.get_uworker_output_urls(self.EXPECTED_INPUT_GCS_PATH),
expected_urls)
self.mock._sign_url.assert_called_with(
expected_upload_url, method='PUT', minutes=DEFAULT_SIGNED_URL_MINUTES)
self.EXPECTED_OUTPUT_GCS_PATH,
method='PUT',
minutes=DEFAULT_SIGNED_URL_MINUTES)

def test_get_uworker_input_urls(self):
"""Tests that get_uworker_input_urls works."""
expected_urls = (self.FAKE_URL, self.EXPECTED_GCS_PATH)
expected_urls = (self.FAKE_URL, self.EXPECTED_INPUT_GCS_PATH)
self.assertEqual(uworker_io.get_uworker_input_urls(), expected_urls)
self.mock._sign_url.assert_called_with(
self.EXPECTED_GCS_PATH,
self.EXPECTED_INPUT_GCS_PATH,
method='GET',
minutes=DEFAULT_SIGNED_URL_MINUTES)

Expand Down Expand Up @@ -159,15 +163,16 @@ def test_error_and_testcase_behavior(self):
@test_utils.with_cloud_emulators('datastore')
class RoundTripTest(unittest.TestCase):
"""Tests round trips for download and uploading inputs and outputs."""
WORKER_IO_BUCKET = 'UWORKER_IO'
WORKER_INPUT_BUCKET = 'UWORKER_INPUT'
WORKER_OUTPUT_BUCKET = 'UWORKER_OUTPUT'
NEW_IO_FILE_NAME = 'new-filename'
EXPECTED_GCS_PATH = '/UWORKER_IO/new-filename'
FAKE_URL = 'https://fake'

def setUp(self):
helpers.patch_environ(self)
os.environ['FAIL_RETRIES'] = '1'
os.environ['TEST_UWORKER_IO_BUCKET'] = self.WORKER_IO_BUCKET
os.environ['TEST_UWORKER_INPUT_BUCKET'] = self.WORKER_INPUT_BUCKET
os.environ['TEST_UWORKER_OUTPUT_BUCKET'] = self.WORKER_OUTPUT_BUCKET
helpers.patch(self, [
'clusterfuzz._internal.google_cloud_utils.storage.get',
'clusterfuzz._internal.google_cloud_utils.storage._sign_url',
Expand Down

0 comments on commit eebb87e

Please sign in to comment.