Skip to content

Commit

Permalink
feat: funman taskrunner (#3375)
Browse files Browse the repository at this point in the history
  • Loading branch information
mwdchang authored Apr 22, 2024
1 parent 525eac9 commit 6da7c61
Show file tree
Hide file tree
Showing 5 changed files with 355 additions and 3 deletions.
17 changes: 14 additions & 3 deletions docker-bake.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ function "check_suffix" {

# ---------------------------------
group "prod" {
targets = ["hmi-client", "hmi-server", "db-migration", "gollm-taskrunner", "mira-taskrunner"]
targets = ["hmi-client", "hmi-server", "db-migration", "gollm-taskrunner", "mira-taskrunner", "funman-taskrunner"]
}

group "staging" {
targets = ["hmi-client", "hmi-server", "db-migration", "gollm-taskrunner", "mira-taskrunner"]
targets = ["hmi-client", "hmi-server", "db-migration", "gollm-taskrunner", "mira-taskrunner", "funman-taskrunner"]
}

group "default" {
targets = ["hmi-client-base", "hmi-server-base", "db-migration-base", "gollm-taskrunner-base", "mira-taskrunner-base"]
targets = ["hmi-client-base", "hmi-server-base", "db-migration-base", "gollm-taskrunner-base", "mira-taskrunner-base", "funman-taskrunner-base"]
}

# ---------------------------------
Expand Down Expand Up @@ -91,3 +91,14 @@ target "mira-taskrunner-base" {
target "mira-taskrunner" {
inherits = ["_platforms", "mira-taskrunner-base"]
}

target "funman-taskrunner-base" {
context = "." # root of the repo
dockerfile = "./packages/taskrunner/docker/Dockerfile.funman"
tags = tag("funman-taskrunner", "", "")
}

target "funman-taskrunner" {
inherits = ["_platforms", "funman-taskrunner-base"]
}

170 changes: 170 additions & 0 deletions packages/funman/core/taskrunner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
import json
import signal
import sys
import concurrent.futures
import argparse
import os
import time
import threading
from typing import Callable


READ_CHUNK_SIZE = 1024*1024


class SelfDestructThread:
def __init__(self, self_destruct_timeout_seconds):
self.self_destruct_timeout_seconds = self_destruct_timeout_seconds
self.stop_destruct_event = threading.Event()
self.thread = threading.Thread(target=self._kill_after_timeout)

def start(self):
self.thread.start()

def stop(self):
self.stop_destruct_event.set()
self.thread.join()

def _kill_after_timeout(self):
self.stop_destruct_event.wait(self.self_destruct_timeout_seconds)
if not self.stop_destruct_event.is_set():
# if for whatever reason this process is still around, do our best to self destruct
os.kill(
os.getpid(), signal.SIGKILL
) # Send the SIGTERM signal to the current process


class TaskRunnerInterface:
def __init__(self, description: str):
parser = argparse.ArgumentParser(description=description)
parser.add_argument(
"--id", type=str, required=True, help="The id of the request"
)
parser.add_argument(
"--input", type=str, required=False, help="The name of the input pipe"
)
parser.add_argument(
"--input_pipe", type=str, required=False, help="The name of the input pipe"
)
parser.add_argument(
"--output_pipe",
type=str,
required=False,
help="The name of the output pipe",
)
parser.add_argument(
"--progress_pipe",
type=str,
required=False,
help="The name of the progress pipe",
)
parser.add_argument(
"--self_destruct_timeout_seconds",
type=int,
default=60 * 60 * 24,
required=False,
help="Process self destruct timeout in seconds",
)
args = parser.parse_args()
self.id = args.id
self.input = args.input
self.input_pipe = args.input_pipe
self.output_pipe = args.output_pipe
self.progress_pipe = args.progress_pipe
self.has_written_output = False

if self.input is None and self.input_pipe is None:
raise ValueError("Either `input` or `input_pipe` must be specified")

# Start the self destruct timer
self.self_destructor = SelfDestructThread(args.self_destruct_timeout_seconds)
self.self_destructor.start()

def log(self, msg: str):
print(msg, flush=True)

def shutdown(self):
self.self_destructor.stop()

def read_input_with_timeout(self, timeout_seconds: int = 30):
def read_input() -> dict:
self.log("Reading input from input pipe")
chunks = []
with open(self.input_pipe, "rb") as f:
while True:
chunk = f.read(READ_CHUNK_SIZE)
if chunk == b"":
break
chunks.append(chunk)
return b"".join(chunks).decode("utf-8")

if self.input is not None:
self.log("Reading input from input argument")
return json.loads(self.input)

with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(read_input)
try:
return future.result(timeout=timeout_seconds)
except concurrent.futures.TimeoutError:
raise TimeoutError("Reading from input pipe timed out")

def write_progress_with_timeout(self, progress: dict, timeout_seconds: int):
def write_progress(progress_pipe: str, progress: dict):
bs = json.dumps(progress, separators=(',', ':')).encode()
with open(progress_pipe, 'wb') as f_out:
f_out.write(bs)
return

# if no progress pipe is specified, just print the progress to stdout
if self.progress_pipe is None:
self.log("Writing progress to stdout")
print(json.dumps(progress))
return

with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(write_progress, self.progress_pipe, progress)
try:
return future.result(timeout=timeout_seconds)
except concurrent.futures.TimeoutError:
print('Writing to progress pipe {} timed out'.format(self.progress_pipe), flush=True)
raise TimeoutError('Writing to output pipe timed out')

def write_output_with_timeout(self, output: dict, timeout_seconds: int = 30):
def write_output(output: dict):
self.log("Writing output to output pipe")
bs = json.dumps(output).encode("utf-8")
with open(self.output_pipe, "wb") as f_out:
f_out.write(bs)
return

# output should only be written once
if self.has_written_output:
raise ValueError("Output has already been written")

self.has_written_output = True

# if no output pipe is specified, just print the output to stdout
if self.output_pipe is None:
self.log("Writing output to stdout")
print(json.dumps(output))
return

# signal to the taskrunner that it should stop consuming progress
self.write_progress_with_timeout({'done':True}, timeout_seconds)

# otherwise use the output pipe
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(write_output, output)
try:
return future.result(timeout=timeout_seconds)
except concurrent.futures.TimeoutError:
raise TimeoutError("Writing to output pipe timed out")

def on_cancellation(self, func: Callable):
def signal_handler(sig, frame):
func()
sys.exit(1)

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
14 changes: 14 additions & 0 deletions packages/funman/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from setuptools import setup, find_packages

setup(
name="funman_task",
version="0.1.0",
packages=find_packages(),
install_requires=[],
entry_points={
"console_scripts": [
"funman_task:validate_modelconfig=tasks.validate_modelconfig:main",
],
},
python_requires=">=3.8",
)
108 changes: 108 additions & 0 deletions packages/funman/tasks/validate_modelconfig.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import sys
import os
import json
from core.taskrunner import TaskRunnerInterface

# Funman imports
from funman import Funman
from funman.config import FUNMANConfig
from funman.model.model import _wrap_with_internal_model
from funman.scenario.scenario import AnalysisScenario
from funman.model.petrinet import PetrinetModel
from funman.representation.parameter_space import ParameterSpace
from funman.server.query import (
FunmanProgress,
FunmanResults,
FunmanWorkUnit,
)
import pydantic
from pydantic import TypeAdapter
from funman.model.generated_models.petrinet import Model as GeneratedPetrinet


# FIXME
dummy_id = "xyz"

adapter = TypeAdapter(GeneratedPetrinet)
current_results = None

def cleanup():
print("Task cleanup")

def run_validate(model: PetrinetModel, request):
current_results = FunmanResults(
id=dummy_id,
model=model,
request=request,
parameter_space=ParameterSpace(),
)

# Update callback
def update_current_results(scenario: AnalysisScenario, results: ParameterSpace) -> FunmanProgress:
progress = current_results.update_parameter_space(scenario, results)
print("update hook", progress)


# Invoke solver
work = FunmanWorkUnit(id=dummy_id, model=model, request=request)
f = Funman()
scenario = work.to_scenario()
config = (
FUNMANConfig()
if work.request.config is None
else work.request.config
)
result = f.solve(
scenario,
config=config,
# haltEvent=self._halt_event,
resultsCallback=lambda results: update_current_results(scenario, results),
)
print("Done solver portion")
current_results.finalize_result(scenario, result)
print(current_results.model_dump_json(by_alias=False))

return current_results.model_dump_json(by_alias=False)


def taskrunner_wrapper():
print("Taskrunner wrapper")
try:
taskrunner = TaskRunnerInterface(description="Validate model configuration")
taskrunner.on_cancellation(cleanup)

# Input wrangling
data = taskrunner.read_input_with_timeout()
data_json = json.loads(data)

# Create work unit
model = adapter.validate_python(test["model"])
model = _wrap_with_internal_model(model)
request = data_json["request"]
result = run_validate(model, request)
taskrunner.write_output_with_timeout({"response": result})

except Exception as e:
sys.stderr.write(f"Error: {str(e)}\n")
sys.stderr.flush()
exitCode = 1


def debug_wrapper():
f = open("funman-apr-12.json", "r")
test = json.loads(f.read())
model = adapter.validate_python(test["model"])
model = _wrap_with_internal_model(model)
request = test["request"]
run_validate(model, request)


def main():
if os.getenv("TASKRUNNER_DEBUG") == "1":
debug_wrapper()
else:
taskrunner_wrapper()


if __name__ == "__main__":
main()
49 changes: 49 additions & 0 deletions packages/taskrunner/docker/Dockerfile.funman
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Build the Spring Boot application
FROM eclipse-temurin:17.0.10_7-jdk-focal AS taskrunner_builder

WORKDIR /taskrunner

COPY ./packages/taskrunner .

RUN ./gradlew bootJar


# ------------------------------------------------------------------------------

# Funman-base, should contain dreal4, ibex, and python dependencies
FROM ghcr.io/darpa-askem/funman-base:latest-e5fb635757aa57007615a75371f55dd4a24851e0



# Install OpenJDK JRE
RUN apt-get update && \
apt-get install -y --no-install-recommends openjdk-17-jre-headless && \
rm -rf /var/lib/apt/lists/*


# Copy the Spring Boot fat JAR from the builder image
COPY --from=taskrunner_builder /taskrunner/build/libs/*.jar /taskrunner.jar

# Copy the echo script for testing
COPY ./packages/taskrunner/src/test/resources/echo.py /echo.py


ADD https://api.github.com/repos/DARPA-ASKEM/funman-api/git/refs/heads/develop version.json
RUN git clone https://github.com/DARPA-ASKEM/funman-api.git

WORKDIR funman-api

RUN pip install .
RUN pip install auxiliary_packages/funman_dreal
RUN pip install auxiliary_packages/funman_demo

# Copy the funman package
COPY ./packages/funman /funman_tasks
WORKDIR /funman_tasks

# Install the tasks for mira
RUN pip install -e .

WORKDIR /

CMD ["java", "-jar", "taskrunner.jar"]

0 comments on commit 6da7c61

Please sign in to comment.