Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Passing a DagRun to a PythonVirtualenvOperator gives NameError: name 'timedelta' is not defined #35483

Closed
1 of 2 tasks
Felix-neko opened this issue Nov 6, 2023 · 7 comments
Closed
1 of 2 tasks
Labels
area:core kind:bug This is a clearly a bug pending-response stale Stale PRs per the .github/workflows/stale.yml policy file

Comments

@Felix-neko
Copy link

Felix-neko commented Nov 6, 2023

Apache Airflow version

2.7.3

What happened

I have a simple DAG (with render_template_as_native_obj option enabled). I'm trying to pass a {{ dag_run }} to a PythonVirtualenvOperator, but when it starts to execute it fails with NameError: name 'timedelta' is not defined error.

I'm trying to run it on Python 3.11, airflow==2.7.3 and dill==0.3.7.

Here's my DAG:

import datetime
from pathlib import Path
import airflow
from airflow import DAG
from airflow.operators.python import PythonOperator, PythonVirtualenvOperator
import dill


dag = DAG(
    dag_id='strange_pickling_error_dag',
    schedule_interval='0 5 * * 1',
    start_date=datetime.datetime(2020, 1, 1),
    catchup=False,
    render_template_as_native_obj=True,
)


context = {"ts": "{{ ts }}", "dag_run": "{{ dag_run }}"}
op_args = [context, Path(__file__).parent.absolute()]


def make_foo(*args, **kwargs):
    print("---> making foo!")
    print("make foo(...): args")
    print(args)
    print("make foo(...): kwargs")
    print(kwargs)


make_foo_task = PythonVirtualenvOperator(
    task_id='make_foo',
    python_callable=make_foo,
    use_dill=True,
    system_site_packages=False,
    op_args=op_args,
    requirements=[f"dill=={dill.__version__}", f"apache-airflow=={airflow.__version__}"],
    dag=dag)

And here's my error:

[2023-11-06, 18:34:16 UTC] {process_utils.py:182} INFO - Executing cmd: /tmp/venvp8tw4qco/bin/python /tmp/venvp8tw4qco/script.py /tmp/venvp8tw4qco/script.in /tmp/venvp8tw4qco/script.out /tmp/venvp8tw4qco/string_args.txt /tmp/venvp8tw4qco/termination.log
[2023-11-06, 18:34:16 UTC] {process_utils.py:186} INFO - Output:
[2023-11-06, 18:34:16 UTC] {process_utils.py:190} INFO - Traceback (most recent call last):
[2023-11-06, 18:34:16 UTC] {process_utils.py:190} INFO -   File "/tmp/venvp8tw4qco/script.py", line 17, in <module>
[2023-11-06, 18:34:16 UTC] {process_utils.py:190} INFO -     arg_dict = dill.load(file)
[2023-11-06, 18:34:16 UTC] {process_utils.py:190} INFO -                ^^^^^^^^^^^^^^^
[2023-11-06, 18:34:16 UTC] {process_utils.py:190} INFO -   File "/tmp/venvp8tw4qco/lib/python3.11/site-packages/dill/_dill.py", line 287, in load
[2023-11-06, 18:34:16 UTC] {process_utils.py:190} INFO -     return Unpickler(file, ignore=ignore, **kwds).load()
[2023-11-06, 18:34:16 UTC] {process_utils.py:190} INFO -            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[2023-11-06, 18:34:16 UTC] {process_utils.py:190} INFO -   File "/tmp/venvp8tw4qco/lib/python3.11/site-packages/dill/_dill.py", line 442, in load
[2023-11-06, 18:34:16 UTC] {process_utils.py:190} INFO -     obj = StockUnpickler.load(self)
[2023-11-06, 18:34:16 UTC] {process_utils.py:190} INFO -           ^^^^^^^^^^^^^^^^^^^^^^^^^
[2023-11-06, 18:34:16 UTC] {process_utils.py:190} INFO -   File "/home/felix/Projects/fs_etl/venv_py3.11/lib/python3.11/site-packages/pendulum/tz/timezone.py", line 312, in __init__
[2023-11-06, 18:34:16 UTC] {process_utils.py:190} INFO -     self._utcoffset = timedelta(seconds=offset)
[2023-11-06, 18:34:16 UTC] {process_utils.py:190} INFO -                       ^^^^^^^^^
[2023-11-06, 18:34:16 UTC] {process_utils.py:190} INFO - NameError: name 'timedelta' is not defined
[2023-11-06, 18:34:17 UTC] {taskinstance.py:1937} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/felix/Projects/fs_etl/venv_py3.11/lib/python3.11/site-packages/airflow/operators/python.py", line 395, in execute
    return super().execute(context=serializable_context)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/felix/Projects/fs_etl/venv_py3.11/lib/python3.11/site-packages/airflow/operators/python.py", line 192, in execute
    return_value = self.execute_callable()
                   ^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/felix/Projects/fs_etl/venv_py3.11/lib/python3.11/site-packages/airflow/operators/python.py", line 609, in execute_callable
    result = self._execute_python_callable_in_subprocess(python_path, tmp_path)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/felix/Projects/fs_etl/venv_py3.11/lib/python3.11/site-packages/airflow/operators/python.py", line 446, in _execute_python_callable_in_subprocess
    execute_in_subprocess(
  File "/home/felix/Projects/fs_etl/venv_py3.11/lib/python3.11/site-packages/airflow/utils/process_utils.py", line 171, in execute_in_subprocess
    execute_in_subprocess_with_kwargs(cmd, cwd=cwd)
  File "/home/felix/Projects/fs_etl/venv_py3.11/lib/python3.11/site-packages/airflow/utils/process_utils.py", line 194, in execute_in_subprocess_with_kwargs
    raise subprocess.CalledProcessError(exit_code, cmd)
subprocess.CalledProcessError: Command '['/tmp/venvp8tw4qco/bin/python', '/tmp/venvp8tw4qco/script.py', '/tmp/venvp8tw4qco/script.in', '/tmp/venvp8tw4qco/script.out', '/tmp/venvp8tw4qco/string_args.txt', '/tmp/venvp8tw4qco/termination.log']' returned non-zero exit status 1.

What you think should happen instead

I think that this DagRun variable should be correctly unpickled.

How to reproduce

I'm running airflow with simple airflow standalone command in my virtualenv shell.
I'm using the default SequentialExecutor.

Operating System

Ubuntu 22.04

Versions of Apache Airflow Providers

apache-airflow-providers-common-sql==1.8.0
apache-airflow-providers-ftp==3.6.0
apache-airflow-providers-http==4.6.0
apache-airflow-providers-imap==3.4.0
apache-airflow-providers-sqlite==3.5.0

Deployment

Official Apache Airflow Helm Chart

Deployment details

I'm using a Python 3.11 virtualenv.
Here's my pip3 freeze:

aiohttp==3.8.6
aiosignal==1.3.1
alembic==1.12.1
annotated-types==0.6.0
anyio==4.0.0
apache-airflow==2.7.3
apache-airflow-providers-common-sql==1.8.0
apache-airflow-providers-ftp==3.6.0
apache-airflow-providers-http==4.6.0
apache-airflow-providers-imap==3.4.0
apache-airflow-providers-sqlite==3.5.0
apispec==6.3.0
argcomplete==3.1.4
asgiref==3.7.2
async-timeout==4.0.3
attrs==23.1.0
Babel==2.13.1
backoff==2.2.1
blinker==1.7.0
cachelib==0.9.0
cattrs==23.1.2
certifi==2023.7.22
cffi==1.16.0
charset-normalizer==3.3.2
click==8.1.7
clickclick==20.10.2
colorama==0.4.6
colorlog==4.8.0
ConfigUpdater==3.1.1
connexion==2.14.2
cron-descriptor==1.4.0
croniter==2.0.1
cryptography==41.0.5
Deprecated==1.2.14
dill==0.3.7
distlib==0.3.7
dnspython==2.4.2
docutils==0.20.1
email-validator==1.3.1
filelock==3.13.1
Flask==2.2.5
Flask-AppBuilder==4.3.6
Flask-Babel==2.0.0
Flask-Caching==2.1.0
Flask-JWT-Extended==4.5.3
Flask-Limiter==3.5.0
Flask-Login==0.6.3
Flask-Session==0.5.0
Flask-SQLAlchemy==2.5.1
Flask-WTF==1.2.1
frozenlist==1.4.0
google-re2==1.1
googleapis-common-protos==1.61.0
graphviz==0.20.1
greenlet==3.0.1
grpcio==1.59.2
gunicorn==21.2.0
h11==0.14.0
httpcore==1.0.1
httpx==0.25.1
idna==3.4
importlib-metadata==6.8.0
importlib-resources==6.1.0
inflection==0.5.1
itsdangerous==2.1.2
Jinja2==3.1.2
jsonschema==4.19.2
jsonschema-specifications==2023.7.1
lazy-object-proxy==1.9.0
limits==3.6.0
linkify-it-py==2.0.2
lockfile==0.12.2
Mako==1.2.4
Markdown==3.5.1
markdown-it-py==3.0.0
MarkupSafe==2.1.3
marshmallow==3.20.1
marshmallow-oneofschema==3.0.1
marshmallow-sqlalchemy==0.26.1
mdit-py-plugins==0.4.0
mdurl==0.1.2
multidict==6.0.4
numpy==1.26.1
opentelemetry-api==1.20.0
opentelemetry-exporter-otlp==1.20.0
opentelemetry-exporter-otlp-proto-common==1.20.0
opentelemetry-exporter-otlp-proto-grpc==1.20.0
opentelemetry-exporter-otlp-proto-http==1.20.0
opentelemetry-proto==1.20.0
opentelemetry-sdk==1.20.0
opentelemetry-semantic-conventions==0.41b0
ordered-set==4.1.0
packaging==23.2
pathspec==0.11.2
pendulum==2.1.2
platformdirs==3.11.0
pluggy==1.3.0
prison==0.2.1
protobuf==4.25.0
psutil==5.9.6
pycparser==2.21
pydantic==2.4.2
pydantic_core==2.10.1
Pygments==2.16.1
PyJWT==2.8.0
python-daemon==3.0.1
python-dateutil==2.8.2
python-nvd3==0.15.0
python-slugify==8.0.1
pytz==2023.3.post1
pytzdata==2020.1
PyYAML==6.0.1
referencing==0.30.2
requests==2.31.0
requests-toolbelt==1.0.0
rfc3339-validator==0.1.4
rich==13.6.0
rich-argparse==1.4.0
rpds-py==0.12.0
setproctitle==1.3.3
six==1.16.0
sniffio==1.3.0
SQLAlchemy==1.4.50
SQLAlchemy-JSONField==1.0.1.post0
SQLAlchemy-Utils==0.41.1
sqlparse==0.4.4
tabulate==0.9.0
tenacity==8.2.3
termcolor==2.3.0
text-unidecode==1.3
typing_extensions==4.8.0
uc-micro-py==1.0.2
unicodecsv==0.14.1
urllib3==2.0.7
virtualenv==20.24.6
Werkzeug==2.2.3
wrapt==1.15.0
WTForms==3.0.1
yarl==1.9.2
zipp==3.17.0

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@Felix-neko Felix-neko added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Nov 6, 2023
@Felix-neko
Copy link
Author

Felix-neko commented Nov 6, 2023

UPD:

If I simply add this two lines

serialized_dag = dill.dumps(dag)
dill.loads(serialized_dag)

to the end of my DAG's code and execute it in PyCharm (on the save virtualenv on which my airflow executor runs) i get the same NameError: name 'timedelta' is not defined

@Taragolis
Copy link
Contributor

I assume that is might be the same reason as described here: #35307

@jscheffl
Copy link
Contributor

jscheffl commented Nov 6, 2023

Good catch @Taragolis :-D I assume the same when I read this.

@Felix-neko Is there a specific reason to pass the full DagRun object into the context? Would it help as workaround to only pass in specific details of the full DagRun? Which details would you need in the DagRun for the execution? (Just thinking about a workaround to un-block you)

@Felix-neko
Copy link
Author

Felix-neko commented Nov 6, 2023

@Felix-neko Is there a specific reason to pass the full DagRun object into the context? Would it help as workaround to only pass in specific details of the full DagRun? Which details would you need in the DagRun for the execution? (Just thinking about a workaround to un-block you)

@jscheffl : аlas, yes: we have such reasons. We have many DAGs that use DagRun, Dag and TaskInstance instance as objects, extracting data from them inside operators.

And we have a homemade library that uses airflow and also extracts data from DagRun and TaskInstance objects inside operators. This feature is really helpful for us.

@Taragolis
Copy link
Contributor

Hey, @Felix-neko a lot of time has passed since last activity on this issue. Any chance that you have check it on Airflow 2.8.1 with pendulum 3?

Copy link

This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Mar 15, 2024
Copy link

This issue has been closed because it has not received response from the issue author.

@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale Mar 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core kind:bug This is a clearly a bug pending-response stale Stale PRs per the .github/workflows/stale.yml policy file
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants