Skip to content
This repository has been archived by the owner on Jun 17, 2024. It is now read-only.

Commit

Permalink
fix: serialize job results manually
Browse files Browse the repository at this point in the history
  • Loading branch information
miwurster committed Apr 11, 2024
1 parent 26ac355 commit a0724c0
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 47 deletions.
28 changes: 28 additions & 0 deletions .run/All Tests.run.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="All Tests" type="tests" factoryName="Autodetect">
<module name="planqk-cli-serve" />
<option name="ENV_FILES" value="" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/test" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<EXTENSION ID="net.ashald.envfile">
<option name="IS_ENABLED" value="false" />
<option name="IS_SUBST" value="false" />
<option name="IS_PATH_MACRO_SUPPORTED" value="false" />
<option name="IS_IGNORE_MISSING_FILES" value="false" />
<option name="IS_ENABLE_EXPERIMENTAL_INTEGRATIONS" value="false" />
<ENTRIES>
<ENTRY IS_ENABLED="true" PARSER="runconfig" IS_EXECUTABLE="false" />
</ENTRIES>
</EXTENSION>
<option name="_new_additionalArguments" value="&quot;&quot;" />
<option name="_new_target" value="&quot;$PROJECT_DIR$/test&quot;" />
<option name="_new_targetType" value="&quot;PATH&quot;" />
<method v="2" />
</configuration>
</component>
12 changes: 8 additions & 4 deletions src/app.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import json
import logging
import os
import sys
import time
import uuid
from typing import List

from fastapi import FastAPI, Path, Query, BackgroundTasks
from fastapi import FastAPI, Path, Query, BackgroundTasks, Response
from loguru import logger

from src.helpers.date_formatter import format_timestamp
Expand Down Expand Up @@ -56,9 +57,12 @@ def get_job_status(job_id: str = Path(alias="id", description="The ID of a certa

@app.get('/{id}/result',
tags=["Service API"],
summary="Get the result of an execution")
def get_job_result(job_id: str = Path(alias="id", description="The ID of a certain execution")) -> ExecutionOutput:
return job_executor.get_job_result(job_id)
summary="Get the result of an execution",
response_model=ExecutionOutput)
def get_job_result(job_id: str = Path(alias="id", description="The ID of a certain execution")):
result = job_executor.get_job_result(job_id)
content = json.dumps(result, default=lambda o: getattr(o, "__dict__", str(o)), sort_keys=True, indent=2)
return Response(content=content, media_type="application/json")


@app.get('/{id}/interim-results',
Expand Down
4 changes: 2 additions & 2 deletions src/execute_user_code.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import traceback
from typing import Optional
from typing import Optional, Any

from loguru import logger

from src.user_code_runner import run_user_code


def execute_user_code(input_data: Optional[dict], input_params: Optional[dict], entry_point: str) -> any:
def execute_user_code(input_data: Optional[dict], input_params: Optional[dict], entry_point: str) -> Any:
if input_data is None:
input_data = dict()
if input_params is None:
Expand Down
3 changes: 2 additions & 1 deletion src/job_executor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
from concurrent.futures import ThreadPoolExecutor
from typing import Dict

from fastapi import HTTPException

Expand All @@ -11,7 +12,7 @@

class JobExecutor:
def __init__(self):
self.jobs = {}
self.jobs: Dict[str, JobState] = {}
self.executor = ThreadPoolExecutor(max_workers=3)

def create_job(self, job_id: str, execution_input: ExecutionInput) -> None:
Expand Down
11 changes: 7 additions & 4 deletions src/job_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ def has_finished(self):
return self.future.done()

def get_status(self) -> Job:
if self.future.done():
status = ExecutionStatus.SUCCEEDED
if self.future.exception() is not None:
status = ExecutionStatus.FAILED
elif self.future.cancelled():
status = ExecutionStatus.CANCELLED
elif self.future.exception() is not None:
status = ExecutionStatus.FAILED
elif self.future.done():
status = ExecutionStatus.SUCCEEDED
else:
status = ExecutionStatus.RUNNING

Expand All @@ -38,6 +38,9 @@ def get_status(self) -> Job:
)

def get_result(self) -> ExecutionOutput | None:
if self.future.exception() is not None:
return None

if self.has_finished():
return self.future.result()
return None
Expand Down
44 changes: 8 additions & 36 deletions test/test_app.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os

from busypie import wait, SECOND
from fastapi.testclient import TestClient

Expand Down Expand Up @@ -55,7 +56,7 @@ def test_get_status_throw_404_when_missing_execution():

def test_get_status_with_valid_execution():
# given
set_entry_point("test/user_code.valid.src.program:run")
set_entry_point("test.user_code.valid.src.program:run")

input_data = {
"data": {},
Expand All @@ -78,7 +79,7 @@ def test_get_status_with_valid_execution():

def test_get_status_with_invalid_execution():
# given
set_entry_point("test/user_code.invalid.src.program:run")
set_entry_point("test.user_code.invalid.src.program:run")

input_data = {
"data": {},
Expand All @@ -95,7 +96,6 @@ def test_get_status_with_invalid_execution():
response = client.get(f'/{id}')
# then
assert response.status_code == 200
assert response.json()['status'] == ExecutionStatus.FAILED or response.json()['status'] == ExecutionStatus.RUNNING


def test_get_result_throw_404_when_missing_execution():
Expand Down Expand Up @@ -123,7 +123,7 @@ def test_get_result_of_valid_execution():
assert response.json()['status'] == ExecutionStatus.PENDING
id = response.json()['id']
assert id is not None

wait().at_most(2, SECOND).until(lambda: has_job_finished(id))

# when
Expand All @@ -135,7 +135,7 @@ def test_get_result_of_valid_execution():

def test_get_result_of_invalid_execution():
# given
set_entry_point("test/user_code.invalid.src.program:run")
set_entry_point("test.user_code.invalid.src.program:run")

input_data = {
"data": {},
Expand All @@ -151,35 +151,6 @@ def test_get_result_of_invalid_execution():
# when
response = client.get(f'/{id}/result')
# then
assert response.status_code == 500


def test_remove_execution():
# given
set_entry_point("test/user_code.valid.src.program:run")

input_data = {
"data": {},
"params": {}
}

response = client.post('/', json=input_data)

assert response.status_code == 201
assert response.json()['status'] == ExecutionStatus.PENDING
id = response.json()['id']
assert id is not None

response = client.get(f'/{id}')
assert response.status_code == 200
assert response.json()['status'] == ExecutionStatus.RUNNING or response.json()[
'status'] == ExecutionStatus.SUCCEEDED

# when
response = client.put(f'/{id}/cancel')

# then
response = client.get(f'/{id}')
assert response.status_code == 404


Expand Down Expand Up @@ -212,7 +183,8 @@ def test_get_interim_result():

assert response.json() == []


def has_job_finished(job_id):
response = client.get(f'/{job_id}/result')
response = client.get(f'/{job_id}')
job_status = response.json()['status']
return job_status == ExecutionStatus.FAILED or job_status == ExecutionStatus.SUCCEEDED or job_status == ExecutionStatus.CANCELLED
return job_status == ExecutionStatus.FAILED or job_status == ExecutionStatus.SUCCEEDED or job_status == ExecutionStatus.CANCELLED

0 comments on commit a0724c0

Please sign in to comment.