Skip to content

Commit

Permalink
[AIRFLOW-2981] Fix TypeError in dataflow operators (apache#3831)
Browse files Browse the repository at this point in the history
- Fix TypeError in dataflow operators when using GCS jar or py_file
  • Loading branch information
kaxil authored and ashb committed Oct 22, 2018
1 parent a3ecc0a commit abb7248
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 6 deletions.
6 changes: 3 additions & 3 deletions airflow/contrib/operators/dataflow_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import os
import re
import uuid
import copy
Expand Down Expand Up @@ -358,7 +358,7 @@ def google_cloud_to_local(self, file_name):
# Extracts bucket_id and object_id by first removing 'gs://' prefix and
# then split the remaining by path delimiter '/'.
path_components = file_name[self.GCS_PREFIX_LENGTH:].split('/')
if path_components < 2:
if len(path_components) < 2:
raise Exception(
'Invalid Google Cloud Storage (GCS) object path: {}.'
.format(file_name))
Expand All @@ -369,7 +369,7 @@ def google_cloud_to_local(self, file_name):
path_components[-1])
file_size = self._gcs_hook.download(bucket_id, object_id, local_file)

if file_size > 0:
if os.stat(file_size).st_size > 0:
return local_file
raise Exception(
'Failed to download Google Cloud Storage GCS object: {}'
Expand Down
29 changes: 26 additions & 3 deletions tests/contrib/operators/test_dataflow_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@

import unittest

from airflow.contrib.operators.dataflow_operator import DataFlowPythonOperator, \
DataFlowJavaOperator, DataflowTemplateOperator
from airflow.contrib.operators.dataflow_operator import DataFlowPythonOperator
from airflow.contrib.operators.dataflow_operator import \
DataFlowPythonOperator, DataFlowJavaOperator, \
DataflowTemplateOperator, GoogleCloudBucketHelper

from airflow.version import version

try:
Expand Down Expand Up @@ -186,3 +187,25 @@ def test_exec(self, dataflow_mock):
}
start_template_hook.assert_called_once_with(TASK_ID, expected_options,
PARAMETERS, TEMPLATE)


class GoogleCloudBucketHelperTest(unittest.TestCase):

@mock.patch(
'airflow.contrib.operators.dataflow_operator.GoogleCloudBucketHelper.__init__'
)
def test_invalid_object_path(self, mock_parent_init):

# This is just the path of a bucket hence invalid filename
file_name = 'gs://test-bucket'
mock_parent_init.return_value = None

gcs_bucket_helper = GoogleCloudBucketHelper()
gcs_bucket_helper._gcs_hook = mock.Mock()

with self.assertRaises(Exception) as context:
gcs_bucket_helper.google_cloud_to_local(file_name)

self.assertEquals(
'Invalid Google Cloud Storage (GCS) object path: {}.'.format(file_name),
str(context.exception))

0 comments on commit abb7248

Please sign in to comment.