From a0724c0c194d728bd249db634a84b2aaf457bae0 Mon Sep 17 00:00:00 2001 From: Michael Wurster Date: Thu, 11 Apr 2024 14:59:17 +0200 Subject: [PATCH] fix: serialize job results manually --- .run/All Tests.run.xml | 28 +++++++++++++++++++++++++ src/app.py | 12 +++++++---- src/execute_user_code.py | 4 ++-- src/job_executor.py | 3 ++- src/job_state.py | 11 ++++++---- test/test_app.py | 44 ++++++++-------------------------------- 6 files changed, 55 insertions(+), 47 deletions(-) create mode 100644 .run/All Tests.run.xml diff --git a/.run/All Tests.run.xml b/.run/All Tests.run.xml new file mode 100644 index 0000000..915e8ea --- /dev/null +++ b/.run/All Tests.run.xml @@ -0,0 +1,28 @@ + + + + + \ No newline at end of file diff --git a/src/app.py b/src/app.py index f8d3de0..d01ada3 100644 --- a/src/app.py +++ b/src/app.py @@ -1,3 +1,4 @@ +import json import logging import os import sys @@ -5,7 +6,7 @@ import uuid from typing import List -from fastapi import FastAPI, Path, Query, BackgroundTasks +from fastapi import FastAPI, Path, Query, BackgroundTasks, Response from loguru import logger from src.helpers.date_formatter import format_timestamp @@ -56,9 +57,12 @@ def get_job_status(job_id: str = Path(alias="id", description="The ID of a certa @app.get('/{id}/result', tags=["Service API"], - summary="Get the result of an execution") -def get_job_result(job_id: str = Path(alias="id", description="The ID of a certain execution")) -> ExecutionOutput: - return job_executor.get_job_result(job_id) + summary="Get the result of an execution", + response_model=ExecutionOutput) +def get_job_result(job_id: str = Path(alias="id", description="The ID of a certain execution")): + result = job_executor.get_job_result(job_id) + content = json.dumps(result, default=lambda o: getattr(o, "__dict__", str(o)), sort_keys=True, indent=2) + return Response(content=content, media_type="application/json") @app.get('/{id}/interim-results', diff --git a/src/execute_user_code.py b/src/execute_user_code.py index f9698c0..79033f9 100644 --- a/src/execute_user_code.py +++ b/src/execute_user_code.py @@ -1,12 +1,12 @@ import traceback -from typing import Optional +from typing import Optional, Any from loguru import logger from src.user_code_runner import run_user_code -def execute_user_code(input_data: Optional[dict], input_params: Optional[dict], entry_point: str) -> any: +def execute_user_code(input_data: Optional[dict], input_params: Optional[dict], entry_point: str) -> Any: if input_data is None: input_data = dict() if input_params is None: diff --git a/src/job_executor.py b/src/job_executor.py index d2d91fe..8551a65 100644 --- a/src/job_executor.py +++ b/src/job_executor.py @@ -1,5 +1,6 @@ import os from concurrent.futures import ThreadPoolExecutor +from typing import Dict from fastapi import HTTPException @@ -11,7 +12,7 @@ class JobExecutor: def __init__(self): - self.jobs = {} + self.jobs: Dict[str, JobState] = {} self.executor = ThreadPoolExecutor(max_workers=3) def create_job(self, job_id: str, execution_input: ExecutionInput) -> None: diff --git a/src/job_state.py b/src/job_state.py index ed232e7..8653134 100644 --- a/src/job_state.py +++ b/src/job_state.py @@ -20,12 +20,12 @@ def has_finished(self): return self.future.done() def get_status(self) -> Job: - if self.future.done(): - status = ExecutionStatus.SUCCEEDED + if self.future.exception() is not None: + status = ExecutionStatus.FAILED elif self.future.cancelled(): status = ExecutionStatus.CANCELLED - elif self.future.exception() is not None: - status = ExecutionStatus.FAILED + elif self.future.done(): + status = ExecutionStatus.SUCCEEDED else: status = ExecutionStatus.RUNNING @@ -38,6 +38,9 @@ def get_status(self) -> Job: ) def get_result(self) -> ExecutionOutput | None: + if self.future.exception() is not None: + return None + if self.has_finished(): return self.future.result() return None diff --git a/test/test_app.py b/test/test_app.py index 3637668..e1de8c9 100644 --- a/test/test_app.py +++ b/test/test_app.py @@ -1,4 +1,5 @@ import os + from busypie import wait, SECOND from fastapi.testclient import TestClient @@ -55,7 +56,7 @@ def test_get_status_throw_404_when_missing_execution(): def test_get_status_with_valid_execution(): # given - set_entry_point("test/user_code.valid.src.program:run") + set_entry_point("test.user_code.valid.src.program:run") input_data = { "data": {}, @@ -78,7 +79,7 @@ def test_get_status_with_valid_execution(): def test_get_status_with_invalid_execution(): # given - set_entry_point("test/user_code.invalid.src.program:run") + set_entry_point("test.user_code.invalid.src.program:run") input_data = { "data": {}, @@ -95,7 +96,6 @@ def test_get_status_with_invalid_execution(): response = client.get(f'/{id}') # then assert response.status_code == 200 - assert response.json()['status'] == ExecutionStatus.FAILED or response.json()['status'] == ExecutionStatus.RUNNING def test_get_result_throw_404_when_missing_execution(): @@ -123,7 +123,7 @@ def test_get_result_of_valid_execution(): assert response.json()['status'] == ExecutionStatus.PENDING id = response.json()['id'] assert id is not None - + wait().at_most(2, SECOND).until(lambda: has_job_finished(id)) # when @@ -135,7 +135,7 @@ def test_get_result_of_valid_execution(): def test_get_result_of_invalid_execution(): # given - set_entry_point("test/user_code.invalid.src.program:run") + set_entry_point("test.user_code.invalid.src.program:run") input_data = { "data": {}, @@ -151,35 +151,6 @@ def test_get_result_of_invalid_execution(): # when response = client.get(f'/{id}/result') # then - assert response.status_code == 500 - - -def test_remove_execution(): - # given - set_entry_point("test/user_code.valid.src.program:run") - - input_data = { - "data": {}, - "params": {} - } - - response = client.post('/', json=input_data) - - assert response.status_code == 201 - assert response.json()['status'] == ExecutionStatus.PENDING - id = response.json()['id'] - assert id is not None - - response = client.get(f'/{id}') - assert response.status_code == 200 - assert response.json()['status'] == ExecutionStatus.RUNNING or response.json()[ - 'status'] == ExecutionStatus.SUCCEEDED - - # when - response = client.put(f'/{id}/cancel') - - # then - response = client.get(f'/{id}') assert response.status_code == 404 @@ -212,7 +183,8 @@ def test_get_interim_result(): assert response.json() == [] + def has_job_finished(job_id): - response = client.get(f'/{job_id}/result') + response = client.get(f'/{job_id}') job_status = response.json()['status'] - return job_status == ExecutionStatus.FAILED or job_status == ExecutionStatus.SUCCEEDED or job_status == ExecutionStatus.CANCELLED \ No newline at end of file + return job_status == ExecutionStatus.FAILED or job_status == ExecutionStatus.SUCCEEDED or job_status == ExecutionStatus.CANCELLED