From 5cb4fb099896f9c29fcb449d9e7cb0ec3ff5d694 Mon Sep 17 00:00:00 2001 From: Victor Chiapaikeo Date: Wed, 8 Nov 2023 13:28:39 -0500 Subject: [PATCH 1/2] Add log lookup exception for empty op subtypes --- airflow/utils/log/file_task_handler.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index 3e2561ba75c22..a03c05831f952 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -304,6 +304,16 @@ def _executor_get_task_log(self) -> Callable[[TaskInstance, int], tuple[list[str executor = ExecutorLoader.get_default_executor() return executor.get_task_log + @cached_property + def _test_mode(self) -> bool: + """Pulls the unit test mode flag from config. + + This lives in a function here to be cached and only hit the config once. + """ + from airflow.configuration import conf + + return conf.getboolean("core", "unit_test_mode") + def _read( self, ti: TaskInstance, @@ -357,7 +367,9 @@ def _read( worker_log_full_path = Path(self.local_base, worker_log_rel_path) local_messages, local_logs = self._read_from_local(worker_log_full_path) messages_list.extend(local_messages) - if is_running and not executor_messages: + if ti.task.inherits_from_empty_operator is True and self._test_mode is False: + executor_logs.append("Operator inherits from empty operator and thus does not have logs") + elif is_running and not executor_messages: served_messages, served_logs = self._read_from_logs_server(ti, worker_log_rel_path) messages_list.extend(served_messages) elif ti.state not in State.unfinished and not (local_logs or remote_logs): From e2055441805d36638e00c0bacc9c29f17257dbc5 Mon Sep 17 00:00:00 2001 From: Victor Chiapaikeo Date: Thu, 11 Jan 2024 09:49:07 -0500 Subject: [PATCH 2/2] Use exception catching approach instead to preserve tests --- airflow/utils/log/file_task_handler.py | 27 ++++++++++---------------- 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index a03c05831f952..f86cc77736dee 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -29,6 +29,7 @@ from typing import TYPE_CHECKING, Any, Callable, Iterable from urllib.parse import urljoin +import httpx import pendulum from airflow.configuration import conf @@ -78,8 +79,6 @@ def _set_task_deferred_context_var(): def _fetch_logs_from_service(url, log_relative_path): - import httpx - from airflow.utils.jwt_signer import JWTSigner timeout = conf.getint("webserver", "log_fetch_timeout_sec", fallback=None) @@ -170,6 +169,9 @@ class FileTaskHandler(logging.Handler): """ trigger_should_wrap = True + inherits_from_empty_operator_log_message = ( + "Operator inherits from empty operator and thus does not have logs" + ) def __init__(self, base_log_folder: str, filename_template: str | None = None): super().__init__() @@ -304,16 +306,6 @@ def _executor_get_task_log(self) -> Callable[[TaskInstance, int], tuple[list[str executor = ExecutorLoader.get_default_executor() return executor.get_task_log - @cached_property - def _test_mode(self) -> bool: - """Pulls the unit test mode flag from config. - - This lives in a function here to be cached and only hit the config once. - """ - from airflow.configuration import conf - - return conf.getboolean("core", "unit_test_mode") - def _read( self, ti: TaskInstance, @@ -367,9 +359,7 @@ def _read( worker_log_full_path = Path(self.local_base, worker_log_rel_path) local_messages, local_logs = self._read_from_local(worker_log_full_path) messages_list.extend(local_messages) - if ti.task.inherits_from_empty_operator is True and self._test_mode is False: - executor_logs.append("Operator inherits from empty operator and thus does not have logs") - elif is_running and not executor_messages: + if is_running and not executor_messages: served_messages, served_logs = self._read_from_logs_server(ti, worker_log_rel_path) messages_list.extend(served_messages) elif ti.state not in State.unfinished and not (local_logs or remote_logs): @@ -567,8 +557,11 @@ def _read_from_logs_server(self, ti, worker_log_rel_path) -> tuple[list[str], li messages.append(f"Found logs served from host {url}") logs.append(response.text) except Exception as e: - messages.append(f"Could not read served logs: {e}") - logger.exception("Could not read served logs") + if isinstance(e, httpx.UnsupportedProtocol) and ti.task.inherits_from_empty_operator is True: + messages.append(self.inherits_from_empty_operator_log_message) + else: + messages.append(f"Could not read served logs: {e}") + logger.exception("Could not read served logs") return messages, logs def _read_remote_logs(self, ti, try_number, metadata=None) -> tuple[list[str], list[str]]: