Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: funman taskrunner #3375

Merged
merged 12 commits into from
Apr 22, 2024
17 changes: 14 additions & 3 deletions docker-bake.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ function "check_suffix" {

# ---------------------------------
group "prod" {
targets = ["hmi-client", "hmi-server", "db-migration", "gollm-taskrunner", "mira-taskrunner"]
targets = ["hmi-client", "hmi-server", "db-migration", "gollm-taskrunner", "mira-taskrunner", "funman-taskrunner"]
}

group "staging" {
targets = ["hmi-client", "hmi-server", "db-migration", "gollm-taskrunner", "mira-taskrunner"]
targets = ["hmi-client", "hmi-server", "db-migration", "gollm-taskrunner", "mira-taskrunner", "funman-taskrunner"]
}

group "default" {
targets = ["hmi-client-base", "hmi-server-base", "db-migration-base", "gollm-taskrunner-base", "mira-taskrunner-base"]
targets = ["hmi-client-base", "hmi-server-base", "db-migration-base", "gollm-taskrunner-base", "mira-taskrunner-base", "funman-taskrunner-base"]
}

# ---------------------------------
Expand Down Expand Up @@ -91,3 +91,14 @@ target "mira-taskrunner-base" {
target "mira-taskrunner" {
inherits = ["_platforms", "mira-taskrunner-base"]
}

target "funman-taskrunner-base" {
context = "." # root of the repo
dockerfile = "./packages/taskrunner/docker/Dockerfile.funman"
tags = tag("funman-taskrunner", "", "")
}

target "funman-taskrunner" {
inherits = ["_platforms", "funman-taskrunner-base"]
}

133 changes: 133 additions & 0 deletions packages/funman/core/taskrunner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# FIXME: this is a copy from mira, should consolidate
import json
import signal
import sys
import concurrent.futures
import argparse
import os
import time
import threading
from typing import Callable


READ_CHUNK_SIZE = 1024*1024


class SelfDestructThread:
def __init__(self, self_destruct_timeout_seconds):
self.self_destruct_timeout_seconds = self_destruct_timeout_seconds
self.stop_destruct_event = threading.Event()
self.thread = threading.Thread(target=self._kill_after_timeout)

def start(self):
self.thread.start()

def stop(self):
self.stop_destruct_event.set()
self.thread.join()

def _kill_after_timeout(self):
self.stop_destruct_event.wait(self.self_destruct_timeout_seconds)
if not self.stop_destruct_event.is_set():
# if for whatever reason this process is still around, do our best to self destruct
os.kill(
os.getpid(), signal.SIGKILL
) # Send the SIGTERM signal to the current process


class TaskRunnerInterface:
mwdchang marked this conversation as resolved.
Show resolved Hide resolved
def __init__(self, description: str):
parser = argparse.ArgumentParser(description=description)
parser.add_argument(
"--id", type=str, required=True, help="The id of the request"
)
parser.add_argument(
"--input", type=str, required=False, help="The name of the input pipe"
)
parser.add_argument(
"--input_pipe", type=str, required=False, help="The name of the input pipe"
)
parser.add_argument(
"--output_pipe",
type=str,
required=False,
help="The name of the output pipe",
)
parser.add_argument(
"--self_destruct_timeout_seconds",
type=int,
default=60 * 60 * 24,
required=False,
help="Process self destruct timeout in seconds",
)
args = parser.parse_args()
self.id = args.id
self.input = args.input
self.input_pipe = args.input_pipe
self.output_pipe = args.output_pipe

if self.input is None and self.input_pipe is None:
raise ValueError("Either `input` or `input_pipe` must be specified")

# Start the self destruct timer
self.self_destructor = SelfDestructThread(args.self_destruct_timeout_seconds)
self.self_destructor.start()

def log(self, msg: str):
print(msg, flush=True)

def shutdown(self):
self.self_destructor.stop()

def read_input_with_timeout(self, timeout_seconds: int = 30):
def read_input() -> dict:
self.log("Reading input from input pipe")
chunks = []
with open(self.input_pipe, "rb") as f:
while True:
chunk = f.read(READ_CHUNK_SIZE)
if chunk == b"":
break
chunks.append(chunk)
return b"".join(chunks).decode("utf-8")

if self.input is not None:
self.log("Reading input from input argument")
return json.loads(self.input)

with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(read_input)
try:
return future.result(timeout=timeout_seconds)
except concurrent.futures.TimeoutError:
raise TimeoutError("Reading from input pipe timed out")

def write_output_with_timeout(self, output: dict, timeout_seconds: int = 30):
def write_output(output: dict):
self.log("Writing output to output pipe")
bs = json.dumps(output).encode("utf-8")
with open(self.output_pipe, "wb") as f_out:
f_out.write(bs)
return

# if no output pipe is specified, just print the output to stdout
if self.output_pipe is None:
self.log("Writing output to stdout")
print(json.dumps(output))
return

# otherwise use the output pipe
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(write_output, output)
try:
return future.result(timeout=timeout_seconds)
except concurrent.futures.TimeoutError:
raise TimeoutError("Writing to output pipe timed out")

def on_cancellation(self, func: Callable):
def signal_handler(sig, frame):
func()
sys.exit(1)

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
14 changes: 14 additions & 0 deletions packages/funman/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from setuptools import setup, find_packages

setup(
name="funman_task",
version="0.1.0",
packages=find_packages(),
install_requires=[],
entry_points={
"console_scripts": [
"funman_task:validate_modelconfig=tasks.validate_modelconfig:main",
],
},
python_requires=">=3.8",
)
108 changes: 108 additions & 0 deletions packages/funman/tasks/validate_modelconfig.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import sys
import os
import json
from core.taskrunner import TaskRunnerInterface

# Funman imports
from funman import Funman
from funman.config import FUNMANConfig
from funman.model.model import _wrap_with_internal_model
from funman.scenario.scenario import AnalysisScenario
from funman.model.petrinet import PetrinetModel
from funman.representation.parameter_space import ParameterSpace
from funman.server.query import (
FunmanProgress,
FunmanResults,
FunmanWorkUnit,
)
import pydantic
from pydantic import TypeAdapter
from funman.model.generated_models.petrinet import Model as GeneratedPetrinet


# FIXME
dummy_id = "xyz"

adapter = TypeAdapter(GeneratedPetrinet)
current_results = None

def cleanup():
print("Task cleanup")

def run_validate(model: PetrinetModel, request):
current_results = FunmanResults(
id=dummy_id,
model=model,
request=request,
parameter_space=ParameterSpace(),
)

# Update callback
def update_current_results(scenario: AnalysisScenario, results: ParameterSpace) -> FunmanProgress:
progress = current_results.update_parameter_space(scenario, results)
print("update hook", progress)


# Invoke solver
work = FunmanWorkUnit(id=dummy_id, model=model, request=request)
f = Funman()
scenario = work.to_scenario()
config = (
FUNMANConfig()
if work.request.config is None
else work.request.config
)
result = f.solve(
scenario,
config=config,
# haltEvent=self._halt_event,
resultsCallback=lambda results: update_current_results(scenario, results),
)
print("Done solver portion")
current_results.finalize_result(scenario, result)
print(current_results.model_dump_json(by_alias=False))

return current_results.model_dump_json(by_alias=False)


def taskrunner_wrapper():
print("Taskrunner wrapper")
try:
taskrunner = TaskRunnerInterface(description="AMR to MMT")
mwdchang marked this conversation as resolved.
Show resolved Hide resolved
taskrunner.on_cancellation(cleanup)

# Input wrangling
data = taskrunner.read_input_with_timeout()
data_json = json.loads(data)

# Create work unit
model = adapter.validate_python(test["model"])
model = _wrap_with_internal_model(model)
request = data_json["request"]
result = run_validate(model, request)
taskrunner.write_output_with_timeout({"response": result})

except Exception as e:
sys.stderr.write(f"Error: {str(e)}\n")
sys.stderr.flush()
exitCode = 1


def debug_wrapper():
f = open("funman-apr-12.json", "r")
test = json.loads(f.read())
model = adapter.validate_python(test["model"])
model = _wrap_with_internal_model(model)
request = test["request"]
run_validate(model, request)


def main():
if os.getenv("TASKRUNNER_DEBUG") == "1":
debug_wrapper()
else:
taskrunner_wrapper()


if __name__ == "__main__":
main()
49 changes: 49 additions & 0 deletions packages/taskrunner/docker/Dockerfile.funman
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Build the Spring Boot application
FROM eclipse-temurin:17.0.10_7-jdk-focal AS taskrunner_builder

WORKDIR /taskrunner

COPY ./packages/taskrunner .

RUN ./gradlew bootJar


# ------------------------------------------------------------------------------

# Funman-base, should contain dreal4, ibex, and python dependencies
FROM ghcr.io/darpa-askem/funman-base:latest-e5fb635757aa57007615a75371f55dd4a24851e0



# Install OpenJDK JRE
RUN apt-get update && \
apt-get install -y --no-install-recommends openjdk-17-jre-headless && \
rm -rf /var/lib/apt/lists/*


# Copy the Spring Boot fat JAR from the builder image
COPY --from=taskrunner_builder /taskrunner/build/libs/*.jar /taskrunner.jar

# Copy the echo script for testing
COPY ./packages/taskrunner/src/test/resources/echo.py /echo.py


ADD https://api.github.com/repos/DARPA-ASKEM/funman-api/git/refs/heads/develop version.json
RUN git clone https://github.com/DARPA-ASKEM/funman-api.git

WORKDIR funman-api

RUN pip install .
RUN pip install auxiliary_packages/funman_dreal
RUN pip install auxiliary_packages/funman_demo

# Copy the funman package
COPY ./packages/funman /funman_tasks
WORKDIR /funman_tasks

# Install the tasks for mira
RUN pip install -e .

WORKDIR /

CMD ["java", "-jar", "taskrunner.jar"]
Loading