-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Infinite Loop Symptoms on Prefect Server #15607
Comments
Workaround: DROP INDEX IF EXISTS uq_task_run_state__task_run_id_timestamp_desc;
CREATE INDEX uq_task_run_state__task_run_id_timestamp_desc
ON task_run_state (task_run_id, timestamp DESC); |
Thanks for the bug report @a14e! Do you have a flow that you can reliably reproduce this with? I suspect that there's something strange happening with the task run recorder, but it will be easier to confirm with a clear way to reproduce the issue. |
@desertaxle Thank you for your reply! My flow looks simplified like this: from prefect import flow, task, serve
from typing import List
class DirectusLoaderTasks:
@task(log_prints=True, name="Directus. Load Topic")
async def get_topic_by_id(self, topic_id: str) -> str:
return "123"
class GrammarDBTasks:
@task(log_prints=True, name="DB. Load items for topic")
async def load_all_items_from_db(self, topic_id: str) -> list[str]:
return []
@task(log_prints=True, name="DB. Load item groups for topic")
async def load_all_groups_from_db(self, topic_id: str) -> list[str]:
return []
@task(log_prints=True, name="Simple filter")
async def simple_filter(from_db: List[str]) -> List[str]:
return []
class OpenAiTasks:
@task(name="Open AI. filter3", retries=3)
async def filter3(self,
groups: List[str],
topic: str) -> List[str]:
return []
@task(name="Open AI. filter1", retries=3)
async def filter1(self,
groups: List[str],
topic: str) -> List[str]:
return []
@task(name="Open AI. filter2", retries=3)
async def filter2(self,
groups: List[str],
groups_in_db: List[int],
topic: str) -> List[str]:
return []
@task(name="Open AI. Normalize", retries=3)
async def normalize(self,
groups: List[str],
topic: str) -> List[str]:
return []
@flow(log_prints=True,name="Item Generation flow")
async def generate_items_flow():
directus_tasks = DirectusLoaderTasks()
grammar_tasks = GrammarDBTasks()
open_ai_tasks = OpenAiTasks()
topic_id = await directus_tasks.get_topic_by_id("123")
item_groups_from_db = await grammar_tasks.load_all_groups_from_db(topic_id)
items_from_db = await grammar_tasks.load_all_items_from_db(topic_id)
new_item_groups = await simple_filter(items_from_db)
deduplicated_response: List[str] = await open_ai_tasks.filter1(
new_item_groups,
topic_id
)
filtered_response: List[str] = await open_ai_tasks.filter2(
deduplicated_response,
item_groups_from_db,
topic_id
)
filtered_response = await open_ai_tasks.filter3(filtered_response,
topic_id)
normalize_forms = await open_ai_tasks.normalize(filtered_response,
topic_id)
return
if __name__ == "__main__":
example_deploy = generate_items_flow.to_deployment(
"Generate Grammar Item Groups",
tags=["tag1", "tag2", "tag3"]
)
serve(example_deploy)
Here’s a video showing what the PostgreSQL logs look like in the terminal: Recording.2024-10-08.232607.mp4 |
I have the exact problem that occurs in the local setup with the PostgreSQL database and prefect server running in a simple docker-compose project. The occurrence rate is quite high, it happens even with such a simple pipeline: from prefect import flow, task
import pandas as pd
import time
import os
import random
@task(name="Transform Data 1")
def transform_data_1(path) -> pd.DataFrame:
print(path)
data = {"id": [1, 2, 3], "value": [10, 20, 30]}
time.sleep(random.randint(1, 10))
return pd.DataFrame(data)
@task(name="Transform Data 2")
def transform_data_2(path) -> pd.DataFrame:
print(path)
data = {"id": [4, 5, 6], "value": [40, 50, 60]}
time.sleep(random.randint(1, 10))
return pd.DataFrame(data)
@flow
def process_data(file_path: str):
df1 = transform_data_1(file_path)
print("Done 1")
df2 = transform_data_2(file_path)
print("Done 2")
@flow(name="Example Pipeline")
def example_pipeline():
for i in range(5):
process_data(
file_path=f"file_{i}.csv",
)
if __name__ == "__main__":
os.environ["PREFECT_API_URL"] = "http://localhost:4200"
example_pipeline() I have tried rewriting this into async calls, but had no effect. Tried both postgres:16-alpine and postgres:17-alpine docker images for the postgres, different versions of python images for the prefect server as well. My version of prefect:
The proposed workaround with the constraint modification on the database side works, but the container with the server is consuming an enormous amount of CPU resources and we need to restart the container to break the loop. |
Thanks for the additional info @a14e and @rejner! I haven't been able to reproduce this on my end, but it looks like there's a race condition where the a state change for a task run can be emitted twice with the exact same timestamp. I haven't yet tracked down where that's happening, but the issue is exacerbated because we keep retrying event processing when there's a failure, which causes this infinite loop. I think I can address the infinite loop issue by adding a dead-letter queue to the task run recorder. The race condition will still exist, but it won't overwhelm the server CPU when it happens, and maybe we'll get some more info about the states that are causing this underlying issue. |
Thank you @desertaxle for such a quick response, your proposed solution seems reasonable for now, and I am looking forward to the fix ;) |
Bug summary
I ran into an issue with Prefect while working locally that looks like an endless loop happening on the backend.
I've observed the following behavior locally several times:
Logs:
Version info (
prefect version
output)Additional context
PostgreSQL version: 16
The text was updated successfully, but these errors were encountered: