Skip to content

Commit

Permalink
[AIRFLOW-2994] Fix command status check in Qubole Check operator (#3790)
Browse files Browse the repository at this point in the history
  • Loading branch information
sakshi2894 authored and msumit committed Aug 29, 2018
1 parent 751a332 commit 314232c
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 3 deletions.
4 changes: 2 additions & 2 deletions airflow/contrib/operators/qubole_check_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,11 +215,11 @@ def get_sql_from_qbol_cmd(params):
def handle_airflow_exception(airflow_exception, hook):
cmd = hook.cmd
if cmd is not None:
if cmd.is_success:
if cmd.is_success(cmd.status):
qubole_command_results = hook.get_query_results()
qubole_command_id = cmd.id
exception_message = '\nQubole Command Id: {qubole_command_id}' \
'\nQubole Command Results:' \
'\n{qubole_command_results}'.format(**locals())
raise AirflowException(str(airflow_exception) + exception_message)
raise AirflowException(airflow_exception.message)
raise AirflowException(str(airflow_exception))
29 changes: 28 additions & 1 deletion tests/contrib/operators/test_qubole_check_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from airflow.contrib.operators.qubole_check_operator import QuboleValueCheckOperator
from airflow.contrib.hooks.qubole_check_hook import QuboleCheckHook
from airflow.contrib.hooks.qubole_hook import QuboleHook
from qds_sdk.commands import HiveCommand

try:
from unittest import mock
Expand Down Expand Up @@ -80,11 +81,13 @@ def test_execute_pass(self, mock_get_hook):
mock_hook.get_first.assert_called_with(query)

@mock.patch.object(QuboleValueCheckOperator, 'get_hook')
def test_execute_fail(self, mock_get_hook):
def test_execute_assertion_fail(self, mock_get_hook):

mock_cmd = mock.Mock()
mock_cmd.status = 'done'
mock_cmd.id = 123
mock_cmd.is_success = mock.Mock(
return_value=HiveCommand.is_success(mock_cmd.status))

mock_hook = mock.Mock()
mock_hook.get_first.return_value = [11]
Expand All @@ -97,6 +100,30 @@ def test_execute_fail(self, mock_get_hook):
'Qubole Command Id: ' + str(mock_cmd.id)):
operator.execute()

mock_cmd.is_success.assert_called_with(mock_cmd.status)

@mock.patch.object(QuboleValueCheckOperator, 'get_hook')
def test_execute_assert_query_fail(self, mock_get_hook):

mock_cmd = mock.Mock()
mock_cmd.status = 'error'
mock_cmd.id = 123
mock_cmd.is_success = mock.Mock(
return_value=HiveCommand.is_success(mock_cmd.status))

mock_hook = mock.Mock()
mock_hook.get_first.return_value = [11]
mock_hook.cmd = mock_cmd
mock_get_hook.return_value = mock_hook

operator = self.__construct_operator('select value from tab1 limit 1;', 5, 1)

with self.assertRaises(AirflowException) as cm:
operator.execute()

self.assertNotIn('Qubole Command Id: ', str(cm.exception))
mock_cmd.is_success.assert_called_with(mock_cmd.status)

@mock.patch.object(QuboleCheckHook, 'get_query_results')
@mock.patch.object(QuboleHook, 'execute')
def test_results_parser_callable(self, mock_execute, mock_get_query_results):
Expand Down

0 comments on commit 314232c

Please sign in to comment.