Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace redis with multiprocessing.Queue for SIL interface #195

Merged
merged 3 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/sil_example.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
"metadata": {},
"source": [
"We use the FastAPI `app` to define the HTTP endpoint. You can read more on the\n",
"use of FastAPI [here](https://fastapi.tiangolo.com/tutorial/). Behind the scenes of the Vessim SiL Controller is a Redis database that holds shared memory for the simulation and the API server process. The Vessim Broker conveys between the DB and the user. To save a value in this DB, you can set an event with a key, value pair. In this case: `\"battery_min_soc\"` and `min_soc`."
"use of FastAPI [here](https://fastapi.tiangolo.com/tutorial/). Behind the scenes of the Vessim SiL Controller is a key-value database that holds shared memory for the simulation and the API server process. The Vessim Broker conveys between the DB and the user. To save a value in this DB, you can set an event with a key, value pair. In this case: `\"battery_min_soc\"` and `min_soc`."
]
},
{
Expand Down
4 changes: 1 addition & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,10 @@ docopt-ng = "^0.6.2"
# Optional dependencies (software-in-the-loop)
requests = {version = "^2.26.0", optional = true}
fastapi = {version = "^0.104.0", optional = true}
docker = {version = "^7.0.0", optional = true}
redis = {version = "^5.0.0", optional = true}
uvicorn = {version = "^0.23.0", optional = true}

[tool.poetry.extras]
sil = ["requests", "fastapi", "docker", "redis", "uvicorn"]
sil = ["requests", "fastapi", "uvicorn"]

[tool.poetry.group.dev]
optional = true
Expand Down
9 changes: 4 additions & 5 deletions vessim/cosim.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from __future__ import annotations

import pickle
from copy import copy
from typing import Optional, Literal

Expand Down Expand Up @@ -55,11 +54,11 @@ def __init__(
actor_entity, controller_entity, ("state", f"actor.{actor_name}")
)

def pickle(self) -> bytes:
def __getstate__(self) -> dict:
"""Returns a Dict with the current state of the microgrid for monitoring."""
cp = copy(self)
cp.controllers = [] # controllers are not needed and often not pickleable
return pickle.dumps(cp)
state = copy(self.__dict__)
state["controllers"] = [] # controllers are not needed and often not pickleable
return state

def finalize(self):
"""Clean up in case the simulation was interrupted.
Expand Down
173 changes: 89 additions & 84 deletions vessim/sil.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,17 @@

from __future__ import annotations

import json
import multiprocessing
import pickle
from queue import Empty as QueueEmpty
from collections import defaultdict
from datetime import datetime, timedelta
from threading import Thread
from time import sleep
from typing import Any, Optional, Callable
from typing import Any, Optional, Callable, Iterable

import docker # type: ignore
import pandas as pd
import redis
import requests
import uvicorn
from docker.models.containers import Container # type: ignore
from fastapi import FastAPI
from loguru import logger
from requests.auth import HTTPBasicAuth
Expand All @@ -29,30 +25,76 @@
from vessim._util import DatetimeLike


def _iterate_queue(q: multiprocessing.Queue, timeout: Optional[float] = None) -> Iterable[Any]:
blocking = timeout is not None
while True:
try:
yield q.get(blocking, timeout)
except QueueEmpty:
break


class Broker:
def __init__(self):
self.redis_db = redis.Redis()
def __init__(self, queue_size: int = 0):
# Note: Any objects put onto queues are automatically pickled and depickled when retrieved.
self._outgoing_events_queue: Optional[multiprocessing.Queue] = multiprocessing.Queue(
maxsize=queue_size
)
self._incoming_data_queue: Optional[multiprocessing.Queue] = multiprocessing.Queue(
maxsize=queue_size
)
self._microgrid: Optional[Microgrid] = None
self._actor_infos: Optional[dict] = None
self._p_delta: Optional[float] = None

def get_microgrid(self) -> Microgrid:
return pickle.loads(self.redis_db.get("microgrid")) # type: ignore
self._process_incoming_data()
assert self._microgrid is not None
return self._microgrid

def get_actor(self, actor: str) -> dict:
return json.loads(self.redis_db.get("actors"))[actor] # type: ignore
self._process_incoming_data()
assert self._actor_infos is not None
return self._actor_infos
marvin-steinke marked this conversation as resolved.
Show resolved Hide resolved

def get_p_delta(self) -> float:
return float(self.redis_db.get("p_delta")) # type: ignore
self._process_incoming_data()
assert self._p_delta is not None
return self._p_delta

def set_event(self, category: str, value: Any) -> None:
self.redis_db.lpush(
"set_events",
pickle.dumps(
dict(
category=category,
time=datetime.now(),
value=value,
)
),
)
if self._outgoing_events_queue is not None:
self._outgoing_events_queue.put(
{
"category": category,
"time": datetime.now(),
"value": value,
}
)

def _add_microgrid_data(self, time: datetime, data: dict) -> None:
if self._incoming_data_queue is not None:
self._incoming_data_queue.put((time, data))

def _consume_events(self) -> Iterable[dict]:
if self._outgoing_events_queue is not None:
yield from _iterate_queue(self._outgoing_events_queue)

# TODO-now note that this should really be run periodically
def _process_incoming_data(self) -> None:
if self._incoming_data_queue is not None:
for time, data in _iterate_queue(self._incoming_data_queue):
self._microgrid = data.pop("microgrid", self._microgrid)
self._actor_infos = data.pop("actor_infos", self._actor_infos)
self._p_delta = data.pop("p_delta", self._p_delta)

def _finalize(self) -> None:
assert self._outgoing_events_queue is not None
self._outgoing_events_queue.close()
self._outgoing_events_queue = None
assert self._incoming_data_queue is not None
self._incoming_data_queue.close()
self._incoming_data_queue = None


class SilController(Controller):
Expand All @@ -75,108 +117,71 @@ def __init__(
self.api_port = api_port
self.request_collector_interval = request_collector_interval
self.kwargs = kwargs
self.redis_docker_container = _redis_docker_container()
self.redis_db = redis.Redis()
self.broker = Broker()

self.microgrid: Optional[Microgrid] = None

def start(self, microgrid: Microgrid) -> None:
self.microgrid = microgrid
name = f"Vessim API for microgrid {id(self.microgrid)}"

multiprocessing.Process(
target=_serve_api,
name="Vessim API",
name=name,
daemon=True,
kwargs=dict(
api_routes=self.api_routes,
api_host=self.api_host,
api_port=self.api_port,
broker=self.broker,
grid_signals=self.grid_signals,
),
).start()
logger.info("Started SiL Controller API server process 'Vessim API'")
logger.info(f"Started SiL Controller API server process '{name}'")

Thread(target=self._collect_set_requests_loop, daemon=True).start()

def step(self, time: datetime, p_delta: float, actor_infos: dict) -> None:
pipe = self.redis_db.pipeline()
pipe.set("time", time.isoformat())
pipe.set("p_delta", p_delta)
pipe.set("actors", json.dumps(actor_infos))
assert self.microgrid is not None
pipe.set("microgrid", self.microgrid.pickle())
pipe.execute()
self.broker._add_microgrid_data(
time,
{
"microgrid": self.microgrid,
"actor_infos": actor_infos,
"p_delta": p_delta,
},
)

def finalize(self) -> None:
if self.redis_docker_container is not None:
self.redis_docker_container.stop()
logger.info("Shut down Redis docker container")
self.broker._finalize()

def _collect_set_requests_loop(self):
while True:
events = self.redis_db.lrange("set_events", start=0, end=-1)
assert events is not None
if len(events) > 0: # type: ignore
events = [pickle.loads(e) for e in events] # type: ignore
events_by_category = defaultdict(dict)
for event in events:
events_by_category[event["category"]][event["time"]] = event["value"]
for category, events in events_by_category.items():
self.request_collectors[category](
events=events_by_category[category],
microgrid=self.microgrid,
kwargs=self.kwargs,
)
self.redis_db.delete("set_events")
events_by_category = defaultdict(dict)
for event in self.broker._consume_events():
events_by_category[event["category"]][event["time"]] = event["value"]
for category, events in events_by_category.items():
self.request_collectors[category](
events=events_by_category[category],
microgrid=self.microgrid,
kwargs=self.kwargs,
)
sleep(self.request_collector_interval)


def _serve_api(
api_routes: Callable,
api_host: str,
api_port: int,
broker: Broker,
grid_signals: dict[str, Signal],
):
app = FastAPI()
api_routes(app, Broker(), grid_signals)
api_routes(app, broker, grid_signals)
config = uvicorn.Config(app=app, host=api_host, port=api_port, access_log=False)
server = uvicorn.Server(config=config)
server.run()


def _redis_docker_container(
docker_client: Optional[docker.DockerClient] = None, port: int = 6379
) -> Container:
"""Initializes Docker client and starts Docker container with Redis."""
if docker_client is None:
try:
docker_client = docker.from_env()
except docker.errors.DockerException as e: # type: ignore
raise RuntimeError("Could not connect to Docker.") from e
try:
container = docker_client.containers.run(
"redis:latest",
ports={"6379/tcp": port},
detach=True, # run in background
)
except docker.errors.APIError as e: # type: ignore
if e.status_code == 500 and "port is already allocated" in e.explanation:
# TODO prompt user to automatically kill container
raise RuntimeError(
f"Could not start Redis container as port {port} is "
f"already allocated. Probably a prevois execution was not "
f"cleaned up properly by Vessim."
) from e
raise

# Check if the container has started
while True:
container_info = docker_client.containers.get(container.name) # type: ignore
if container_info.status == "running": # type: ignore
break
sleep(1)

return container # type: ignore
broker._finalize()


def get_latest_event(events: dict[datetime, Any]) -> Any:
Expand Down
Loading