Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Additional info about Segmentation Violation in LocalTaskJob #27381

Merged
merged 1 commit into from
Dec 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions airflow/jobs/local_task_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,36 @@
from airflow.utils.session import provide_session
from airflow.utils.state import State

SIGSEGV_MESSAGE = """
******************************************* Received SIGSEGV *******************************************
SIGSEGV (Segmentation Violation) signal indicates Segmentation Fault error which refers to
an attempt by a program/library to write or read outside its allocated memory.

In Python environment usually this signal refers to libraries which use low level C API.
Make sure that you use use right libraries/Docker Images
for your architecture (Intel/ARM) and/or Operational System (Linux/macOS).

Suggested way to debug
======================
- Set environment variable 'PYTHONFAULTHANDLER' to 'true'.
- Start airflow services.
- Restart failed airflow task.
- Check 'scheduler' and 'worker' services logs for additional traceback
which might contain information about module/library where actual error happen.

Known Issues
============

Note: Only Linux-based distros supported as "Production" execution environment for Airflow.

macOS
-----
1. Due to limitations in Apple's libraries not every process might 'fork' safe.
One of the general error is unable to query the macOS system configuration for network proxies.
If your are not using a proxy you could disable it by set environment variable 'no_proxy' to '*'.
See: https://github.com/python/cpython/issues/58037 and https://bugs.python.org/issue30385#msg293958
********************************************************************************************************"""


class LocalTaskJob(BaseJob):
"""LocalTaskJob runs a single task instance."""
Expand Down Expand Up @@ -83,6 +113,14 @@ def signal_handler(signum, frame):
self.task_runner.terminate()
self.handle_task_exit(128 + signum)

def segfault_signal_handler(signum, frame):
"""Setting sigmentation violation signal handler"""
self.log.critical(SIGSEGV_MESSAGE)
self.task_runner.terminate()
self.handle_task_exit(128 + signum)
raise AirflowException("Segmentation Fault detected.")

signal.signal(signal.SIGSEGV, segfault_signal_handler)
Comment on lines +116 to +123
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Third attempt for detect segfault and show info for end users by handling SIGSEGV signal.

@potiuk @uranusjr I'm not sure is this correct way to terminate task runner?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m not sure either (and not sure even if this would work), but segmentation faults are not the easiest to test, so we might as well just merge this and see how things go…

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Locally it work fine but only with cases that I know (macOS specific stuff).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with @uranusjr . Let's merge and see. When you have SIGSEGV you are screwed anyway, we cannot get it any worse.

signal.signal(signal.SIGTERM, signal_handler)

if not self.task_instance.check_and_change_state_before_execution(
Expand Down
37 changes: 36 additions & 1 deletion tests/jobs/test_local_task_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from __future__ import annotations

import datetime
import logging
import os
import re
import signal
Expand All @@ -34,7 +35,7 @@
from airflow import settings
from airflow.exceptions import AirflowException
from airflow.executors.sequential_executor import SequentialExecutor
from airflow.jobs.local_task_job import LocalTaskJob
from airflow.jobs.local_task_job import SIGSEGV_MESSAGE, LocalTaskJob
from airflow.jobs.scheduler_job import SchedulerJob
from airflow.models.dagbag import DagBag
from airflow.models.serialized_dag import SerializedDagModel
Expand Down Expand Up @@ -805,6 +806,40 @@ def task_function(ti):
assert "Received SIGTERM. Terminating subprocesses" in caplog.text
assert "Task exited with return code 143" in caplog.text

def test_process_sigsegv_error_message(self, caplog, dag_maker):
"""Test that shows error if process failed with segmentation fault."""
caplog.set_level(logging.CRITICAL, logger="local_task_job.py")

def task_function(ti):
# pytest enable faulthandler by default unless `-p no:faulthandler` is given.
# It can not be disabled on the test level out of the box and
# that mean debug traceback would show in pytest output.
# For avoid this we disable it within the task which run in separate process.
import faulthandler

if faulthandler.is_enabled():
faulthandler.disable()

while not ti.pid:
time.sleep(0.1)

os.kill(psutil.Process(os.getpid()).ppid(), signal.SIGSEGV)

with dag_maker(dag_id="test_segmentation_fault"):
task = PythonOperator(
task_id="test_sigsegv",
python_callable=task_function,
)
dag_run = dag_maker.create_dagrun()
ti = TaskInstance(task=task, run_id=dag_run.run_id)
ti.refresh_from_db()
job = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())
settings.engine.dispose()
with timeout(10):
with pytest.raises(AirflowException, match=r"Segmentation Fault detected"):
job.run()
assert SIGSEGV_MESSAGE in caplog.messages


@pytest.fixture()
def clean_db_helper():
Expand Down