diff --git a/airflow/kubernetes/pod_launcher_deprecated.py b/airflow/kubernetes/pod_launcher_deprecated.py index 3367aa0cc23ef..ccf03b1e493c2 100644 --- a/airflow/kubernetes/pod_launcher_deprecated.py +++ b/airflow/kubernetes/pod_launcher_deprecated.py @@ -149,7 +149,8 @@ def monitor_pod(self, pod: V1Pod, get_logs: bool) -> Tuple[State, Optional[str]] logs = self.read_pod_logs(pod, timestamps=True, since_seconds=read_logs_since_sec) for line in logs: timestamp, message = self.parse_log_line(line.decode('utf-8')) - last_log_time = pendulum.parse(timestamp) + if timestamp: + last_log_time = pendulum.parse(timestamp) self.log.info(message) time.sleep(1) @@ -174,7 +175,7 @@ def monitor_pod(self, pod: V1Pod, get_logs: bool) -> Tuple[State, Optional[str]] time.sleep(2) return self._task_status(self.read_pod(pod)), result - def parse_log_line(self, line: str) -> Tuple[str, str]: + def parse_log_line(self, line: str) -> Tuple[Optional[str], str]: """ Parse K8s log line and returns the final state @@ -184,7 +185,11 @@ def parse_log_line(self, line: str) -> Tuple[str, str]: """ split_at = line.find(' ') if split_at == -1: - raise Exception(f'Log not in "{{timestamp}} {{log}}" format. Got: {line}') + self.log.error( + f"Error parsing timestamp (no timestamp in message: '{line}'). " + "Will continue execution but won't update timestamp" + ) + return None, line timestamp = line[:split_at] message = line[split_at + 1 :].rstrip() return timestamp, message diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 4221ac2afd95e..3b3a7b7a9eb9b 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -269,7 +269,11 @@ def parse_log_line(self, line: str) -> Tuple[Optional[DateTime], str]: """ split_at = line.find(' ') if split_at == -1: - raise Exception(f'Log not in "{{timestamp}} {{log}}" format. Got: {line}') + self.log.error( + f"Error parsing timestamp (no timestamp in message '${line}'). " + "Will continue execution but won't update timestamp" + ) + return None, line timestamp = line[:split_at] message = line[split_at + 1 :].rstrip() try: diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py index e5526a73f94df..8070c3c3532b5 100644 --- a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py +++ b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py @@ -14,6 +14,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import logging from unittest import mock from unittest.mock import MagicMock @@ -234,8 +235,11 @@ def test_parse_log_line(self): assert timestamp == pendulum.parse(real_timestamp) assert line == log_message - with pytest.raises(Exception): + def test_parse_invalid_log_line(self, caplog): + with caplog.at_level(logging.INFO): self.pod_manager.parse_log_line('2020-10-08T14:16:17.793417674ZInvalidmessage\n') + assert "Invalidmessage" in caplog.text + assert "no timestamp in message" in caplog.text @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.run_pod_async") def test_start_pod_retries_on_409_error(self, mock_run_pod_async):