Skip to content

Commit

Permalink
[AIRFLOW-5072] gcs_hook should download once (#5685)
Browse files Browse the repository at this point in the history
(cherry-picked from 4ab6982f0b1d42f20c7d6d62734636445f7e0104)

GitOrigin-RevId: da765f487ff6d1eb877935940cf7de9e49169a41
  • Loading branch information
Tobias Kaymak authored and Cloud Composer Team committed Jun 4, 2021
1 parent 3708cc0 commit c967da7
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 8 deletions.
5 changes: 5 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ Pool size can now be set to -1 to indicate infinite size (it also includes
optimisation of pool query which lead to poor task n^2 performance of task
pool queries in MySQL).

### Google Cloud Storage Hook

The `GoogleCloudStorageDownloadOperator` can either write to a supplied `filename` or
return the content of a file via xcom through `store_to_xcom_key` - both options are mutually exclusive.

## Airflow 1.10.6

### Changes to `aws_default` Connection's default region
Expand Down
12 changes: 9 additions & 3 deletions airflow/contrib/hooks/gcs_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,12 @@ def rewrite(self, source_bucket, source_object, destination_bucket,
# pylint:disable=redefined-builtin
def download(self, bucket, object, filename=None):
"""
Get a file from Google Cloud Storage.
Downloads a file from Google Cloud Storage.
When no filename is supplied, the operator loads the file into memory and returns its
content. When a filename is supplied, it writes the file to the specified location and
returns the location. For file sizes that exceed the available memory it is recommended
to write to a file.
:param bucket: The bucket to fetch from.
:type bucket: str
Expand All @@ -169,8 +174,9 @@ def download(self, bucket, object, filename=None):
if filename:
blob.download_to_filename(filename)
self.log.info('File downloaded to %s', filename)

return blob.download_as_string()
return filename
else:
return blob.download_as_string()

# pylint:disable=redefined-builtin
def upload(self, bucket, object, filename,
Expand Down
14 changes: 11 additions & 3 deletions airflow/contrib/operators/gcs_download_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ def __init__(self,
*args,
**kwargs):
super(GoogleCloudStorageDownloadOperator, self).__init__(*args, **kwargs)

if filename is not None and store_to_xcom_key is not None:
raise ValueError("Either filename or store_to_xcom_key can be set")

self.bucket = bucket
self.object = object
self.filename = filename
Expand All @@ -84,13 +88,17 @@ def execute(self, context):
google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
delegate_to=self.delegate_to
)
file_bytes = hook.download(bucket=self.bucket,
object=self.object,
filename=self.filename)

if self.store_to_xcom_key:
file_bytes = hook.download(bucket=self.bucket,
object=self.object)
if sys.getsizeof(file_bytes) < MAX_XCOM_SIZE:
context['ti'].xcom_push(key=self.store_to_xcom_key, value=file_bytes)
else:
raise RuntimeError(
'The size of the downloaded file is too large to push to XCom!'
)
else:
hook.download(bucket=self.bucket,
object=self.object,
filename=self.filename)
3 changes: 1 addition & 2 deletions tests/contrib/hooks/test_gcs_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -565,12 +565,11 @@ def test_download_to_file(self, mock_service):
download_as_a_string_method = mock_service.return_value.bucket.return_value \
.blob.return_value.download_as_string
download_as_a_string_method.return_value = test_object_bytes

response = self.gcs_hook.download(bucket=test_bucket,
object=test_object,
filename=test_file)

self.assertEquals(response, test_object_bytes)
self.assertEqual(response, test_file)
download_filename_method.assert_called_once_with(test_file)


Expand Down

0 comments on commit c967da7

Please sign in to comment.