diff --git a/airflow/kubernetes/pod_launcher.py b/airflow/kubernetes/pod_launcher.py index dfcda3601de0d..32efb1d77e3cf 100644 --- a/airflow/kubernetes/pod_launcher.py +++ b/airflow/kubernetes/pod_launcher.py @@ -167,7 +167,7 @@ def base_container_is_running(self, pod: V1Pod): wait=tenacity.wait_exponential(), reraise=True ) - def read_pod_logs(self, pod: V1Pod): + def read_pod_logs(self, pod: V1Pod, tail_lines: int = 10): """Reads log from the POD""" try: return self._client.read_namespaced_pod_log( @@ -175,7 +175,7 @@ def read_pod_logs(self, pod: V1Pod): namespace=pod.metadata.namespace, container='base', follow=True, - tail_lines=10, + tail_lines=tail_lines, _preload_content=False ) except BaseHTTPError as e: diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index a8c5c1de38756..e26cf90056cee 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -107,6 +107,30 @@ def _read(self, ti, try_number, metadata=None): # pylint: disable=unused-argume except Exception as e: # pylint: disable=broad-except log = "*** Failed to load local log file: {}\n".format(location) log += "*** {}\n".format(str(e)) + elif conf.get('core', 'executor') == 'KubernetesExecutor': + log += '*** Trying to get logs (last 100 lines) from worker pod {} ***\n\n'\ + .format(ti.hostname) + + try: + from airflow.kubernetes.kube_client import get_kube_client + + kube_client = get_kube_client() + res = kube_client.read_namespaced_pod_log( + name=ti.hostname, + namespace=conf.get('kubernetes', 'namespace'), + container='base', + follow=False, + tail_lines=100, + _preload_content=False + ) + + for line in res: + log += line.decode() + + except Exception as f: # pylint: disable=broad-except + log += '*** Unable to fetch logs from worker pod {} ***\n{}\n\n'.format( + ti.hostname, str(f) + ) else: url = os.path.join( "http://{ti.hostname}:{worker_log_server_port}/log", log_relative_path diff --git a/tests/kubernetes/test_pod_launcher.py b/tests/kubernetes/test_pod_launcher.py index 96ca484c9502a..be3812b4b0e09 100644 --- a/tests/kubernetes/test_pod_launcher.py +++ b/tests/kubernetes/test_pod_launcher.py @@ -75,6 +75,24 @@ def test_read_pod_logs_retries_fails(self): mock.sentinel ) + def test_read_pod_logs_successfully_with_tail_lines(self): + mock.sentinel.metadata = mock.MagicMock() + self.mock_kube_client.read_namespaced_pod_log.side_effect = [ + mock.sentinel.logs + ] + logs = self.pod_launcher.read_pod_logs(mock.sentinel, 100) + self.assertEqual(mock.sentinel.logs, logs) + self.mock_kube_client.read_namespaced_pod_log.assert_has_calls([ + mock.call( + _preload_content=False, + container='base', + follow=True, + name=mock.sentinel.metadata.name, + namespace=mock.sentinel.metadata.namespace, + tail_lines=100 + ), + ]) + def test_read_pod_events_successfully_returns_events(self): mock.sentinel.metadata = mock.MagicMock() self.mock_kube_client.list_namespaced_event.return_value = mock.sentinel.events