Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding OTEL labels on flow runs breaks ECS push work pool workers #16066

Closed
krishnan-chandra opened this issue Nov 20, 2024 · 5 comments · Fixed by #16067
Closed

Adding OTEL labels on flow runs breaks ECS push work pool workers #16066

krishnan-chandra opened this issue Nov 20, 2024 · 5 comments · Fixed by #16067
Labels
bug Something isn't working

Comments

@krishnan-chandra
Copy link

krishnan-chandra commented Nov 20, 2024

Bug summary

We run push work pools in AWS ECS, and orchestrate using Prefect Cloud. We recently upgraded from Prefect 2 -> 3, and our containers in ECS started failing with the below error:

Traceback (most recent call last):
File "/bin/app/lib/python3.12/site-packages/prefect/engine.py", line 37, in <module>
flow_run, flow = load_flow_and_flow_run(flow_run_id=flow_run_id)

File "/bin/app/lib/python3.12/site-packages/prefect/flow_engine.py", line 102, in load_flow_and_flow_run
flow_run = client.read_flow_run(flow_run_id)

File "/bin/app/lib/python3.12/site-packages/prefect/client/orchestration.py", line 3923, in read_flow_run
return FlowRun.model_validate(response.json())

File "/bin/app/lib/python3.12/site-packages/pydantic/main.py", line 596, in model_validate
return cls.__pydantic_validator__.validate_python(

pydantic_core._pydantic_core.ValidationError: 1 validation error for FlowRun
labels
Input should be a valid dictionary [type=dict_type, input_value=None, input_type=NoneType]
For further information visit https://errors.pydantic.dev/2.9/v/dict_type

00:46:23.724 | ERROR | prefect.engine - Engine execution of flow run '10b5cda5-a0fa-4ad5-a9fe-b08d89925e17' exited with unexpected exception

The labels field on flow runs was first introduced in this PR and was released in Prefect version 3.1.3. Downgrading to version 3.1.2 fixes this issue.

We do not have OpenTelemetry installed/configured, which might be why labels is a null value.

A solution may be to make the labels field optional, or add in a Pydantic BeforeValidator to coerce None to an empty dictionary.

Version info

Version:             3.1.2
API version:         0.8.4
Python version:      3.12.4
Git commit:          02b99f0a
Built:               Tue, Nov 12, 2024 1:38 PM
OS/Arch:             darwin/arm64
Profile:             github-actions-sixai
Server type:         cloud
Pydantic version:    2.10.0b2
Integrations:
  prefect-aws:       0.5.2

Additional context

No response

@krishnan-chandra krishnan-chandra added the bug Something isn't working label Nov 20, 2024
@zzstoatzz
Copy link
Collaborator

zzstoatzz commented Nov 20, 2024

thanks for the report @krishnan-chandra! we will take a look and update here


noting for later: i have reproduced this trivially

importing prefect.main re #15957

In [1]: import prefect.main; import uuid;

In [2]: from prefect.client.schemas.objects import FlowRun

In [3]: FlowRun(flow_id=uuid.uuid4(), labels=None)
---------------------------------------------------------------------------
ValidationError                           Traceback (most recent call last)
Cell In[3], line 1
----> 1 FlowRun(flow_id=uuid.uuid4(), labels=None)

File ~/github.com/prefecthq/prefect/.venv/lib/python3.12/site-packages/pydantic/main.py:212, in BaseModel.__init__(self, **data)
    210 # `__tracebackhide__` tells pytest and some other tools to omit this function from tracebacks
    211 __tracebackhide__ = True
--> 212 validated_self = self.__pydantic_validator__.validate_python(data, self_instance=self)
    213 if self is not validated_self:
    214     warnings.warn(
    215         'A custom validator is returning a value other than `self`.\n'
    216         "Returning anything other than `self` from a top level model validator isn't supported when validating via `__init__`.\n"
    217         'See the `model_validator` docs (https://docs.pydantic.dev/latest/concepts/validators/#model-validators) for more details.',
    218         category=None,
    219     )

ValidationError: 1 validation error for FlowRun
labels
  Input should be a valid dictionary [type=dict_type, input_value=None, input_type=NoneType]
    For further information visit https://errors.pydantic.dev/2.9/v/dict_type

but its not yet clear why labels=None is passed in that context


edit: this reproduces the problem in cloud run push pools (with no other job variables)

from prefect import flow

if __name__ == "__main__":
    flow.from_source(
        source="https://gist.github.com/3eb9333625d465cb74381c5116be7aee.git",
        entrypoint="tour_of_artifacts.py:main",
    ).deploy(
        name="repro-16066",
        work_pool_name="sbx-cloud-run-push",
    )

though serving or using a process worker locally appears to work fine

@zzstoatzz
Copy link
Collaborator

zzstoatzz commented Nov 20, 2024

this was automatically closed by github upon merging the PR linked above but ill keep this open until we confirm things are working

@zzstoatzz zzstoatzz reopened this Nov 20, 2024
@krishnan-chandra
Copy link
Author

Thanks! If a new dev release comes out with the fix I'm happy to use that to test on my end.

@krishnan-chandra
Copy link
Author

This has been fixed with Prefect 3.1.4, thanks for the quick turnaround!

@tkramer-motion
Copy link

We are experiencing a similar issue when we have schedules enabled for a flow. The issue goes away if no flows with schedules are on a worker.

+ Exception Group Traceback (most recent call last):
 | File "/usr/local/lib/python3.12/site-packages/prefect/workers/base.py", line 553, in start
 | async with anyio.create_task_group() as loops_task_group:
 | ^^^^^^^^^^^^^^^^^^^^^^^^^
 | File "/usr/local/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 763, in __aexit__
 | raise BaseExceptionGroup(
 | ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
 +-+---------------- 1 ----------------
 | Traceback (most recent call last):
 | File "/usr/local/lib/python3.12/site-packages/prefect/utilities/services.py", line 63, in critical_service_loop
 | await workload()
 | File "/usr/local/lib/python3.12/site-packages/prefect/workers/base.py", line 693, in get_and_submit_flow_runs
 | runs_response = await self._get_scheduled_flow_runs()
 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 | File "/usr/local/lib/python3.12/site-packages/prefect/workers/base.py", line 848, in _get_scheduled_flow_runs
 | await self._client.get_scheduled_flow_runs_for_work_pool(
 | File "/usr/local/lib/python3.12/site-packages/prefect/client/orchestration.py", line 2899, in get_scheduled_flow_runs_for_work_pool
 | return pydantic.TypeAdapter(List[WorkerFlowRunResponse]).validate_python(
 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 | File "/usr/local/lib/python3.12/site-packages/pydantic/type_adapter.py", line 135, in wrapped
 | return func(self, *args, **kwargs)
 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^
 | File "/usr/local/lib/python3.12/site-packages/pydantic/type_adapter.py", line 366, in validate_python
 | return self.validator.validate_python(object, strict=strict, from_attributes=from_attributes, context=context)
 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 | pydantic_core._pydantic_core.ValidationError: 1 validation error for list[WorkerFlowRunResponse]
 | 0.flow_run.labels
 | Input should be a valid dictionary [type=dict_type, input_value=None, input_type=NoneType]
 | For further information visit [https://errors.pydantic.dev/2.9/v/dict_type](https://urldefense.com/v3/__https://errors.pydantic.dev/2.9/v/dict_type__;!!MYZWngDl6w!e0t7LXxgDWJXGM1ncJATJKGpUEF1pzk630P2P6Otv0XnkeNezzqBpvLwgD2u4Gy7yWiwgAfOHqxQkVwM0Q$)
 +------------------------------------
During handling of the above exception, another exception occurred:
 + Exception Group Traceback (most recent call last):
 | File "/usr/local/lib/python3.12/site-packages/prefect/cli/_utilities.py", line 42, in wrapper
 | return fn(*args, **kwargs)
 | ^^^^^^^^^^^^^^^^^^^
 | File "/usr/local/lib/python3.12/site-packages/prefect/cli/_types.py", line 153, in sync_fn
 | return asyncio.run(async_fn(*args, **kwargs))
 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 | File "/usr/local/lib/python3.12/asyncio/runners.py", line 194, in run
 | return runner.run(main)
 | ^^^^^^^^^^^^^^^^
 | File "/usr/local/lib/python3.12/asyncio/runners.py", line 118, in run
 | return self._loop.run_until_complete(task)
 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 | File "/usr/local/lib/python3.12/asyncio/base_events.py", line 687, in run_until_complete
 | return future.result()
 | ^^^^^^^^^^^^^^^
 | File "/usr/local/lib/python3.12/site-packages/prefect/cli/worker.py", line 167, in start
 | await worker.start(
 | File "/usr/local/lib/python3.12/site-packages/prefect/workers/base.py", line 549, in start
 | async with self as worker:
 | ^^^^	
 | File "/usr/local/lib/python3.12/site-packages/prefect/workers/base.py", line 1240, in __aexit__
 | await self.teardown(*exc_info)
 | File "/usr/local/lib/python3.12/site-packages/prefect/workers/base.py", line 659, in teardown
 | await self._exit_stack.__aexit__(*exc_info)
 | File "/usr/local/lib/python3.12/contextlib.py", line 754, in __aexit__
 | raise exc_details[1]
 | File "/usr/local/lib/python3.12/contextlib.py", line 737, in __aexit__
 | cb_suppress = await cb(*exc_details)
 | ^^^^^^^^^^^^^^^^^^^^^^
 | File "/usr/local/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 763, in __aexit__
 | raise BaseExceptionGroup(
 | ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
 +-+---------------- 1 ----------------
 | Exception Group Traceback (most recent call last):
 | File "/usr/local/lib/python3.12/site-packages/prefect/workers/base.py", line 553, in start
 | async with anyio.create_task_group() as loops_task_group:
 | ^^^^^^^^^^^^^^^^^^^^^^^^^
 | File "/usr/local/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 763, in __aexit__
 | raise BaseExceptionGroup(
 | ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
 +-+---------------- 1 ----------------
 | Traceback (most recent call last):
 | File "/usr/local/lib/python3.12/site-packages/prefect/utilities/services.py", line 63, in critical_service_loop
 | await workload()
 | File "/usr/local/lib/python3.12/site-packages/prefect/workers/base.py", line 693, in get_and_submit_flow_runs
 | runs_response = await self._get_scheduled_flow_runs()
 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 | File "/usr/local/lib/python3.12/site-packages/prefect/workers/base.py", line 848, in _get_scheduled_flow_runs
 | await self._client.get_scheduled_flow_runs_for_work_pool(
 | File "/usr/local/lib/python3.12/site-packages/prefect/client/orchestration.py", line 2899, in get_scheduled_flow_runs_for_work_pool
 | return pydantic.TypeAdapter(List[WorkerFlowRunResponse]).validate_python(
 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 | File "/usr/local/lib/python3.12/site-packages/pydantic/type_adapter.py", line 135, in wrapped
 | return func(self, *args, **kwargs)
 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^
 | File "/usr/local/lib/python3.12/site-packages/pydantic/type_adapter.py", line 366, in validate_python
 | return self.validator.validate_python(object, strict=strict, from_attributes=from_attributes, context=context)
 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 | pydantic_core._pydantic_core.ValidationError: 1 validation error for list[WorkerFlowRunResponse]
 | 0.flow_run.labels
 | Input should be a valid dictionary [type=dict_type, input_value=None, input_type=NoneType]
 | For further information visit [https://errors.pydantic.dev/2.9/v/dict_type](https://urldefense.com/v3/__https://errors.pydantic.dev/2.9/v/dict_type__;!!MYZWngDl6w!e0t7LXxgDWJXGM1ncJATJKGpUEF1pzk630P2P6Otv0XnkeNezzqBpvLwgD2u4Gy7yWiwgAfOHqxQkVwM0Q$)
 +------------------------------------
An exception occurred.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants