Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace redis with multiprocessing.Queue for SIL interface #195

Merged
merged 3 commits into from
Apr 4, 2024

Conversation

Impelon
Copy link
Contributor

@Impelon Impelon commented Mar 24, 2024

As discussed with @birnbaum, I tried replacing the redis-container based implementation in sil.py with another option that does not require external installations (e.g. of docker).

I have two prototypes:

  • this one, which is based upon multiprocessing.Queue

  • a prototype which is based upon sqlite3

    That would have allowed more complex queries upon past values, too. Unfortunately it has some problems.
    • Using sqlite3 makes the code more complex.
    • sqlite3 apparently does not work well with multiple processes, only with multiple threads.
      • This means it is not possible to use in-memory databases, because the memory is not shared across processes. One needs to use a temporary file.
      • Timeouts have to be set quite high, to deal with potential write locks of the database.

Either way, let's probably stick with this approach :) I've run the tests workflow in my forked repo, it is already a functional replacement for the previous implementation.

Here are some open discussion points: These are all addressed by #199.

  • I would like to add the ability for the Broker to access previous values of e.g. p_delta, otherwise the SIL interface would not be very useful in my case. One approach would be to use an sqlite database for this (just locally for the API process, not for communicating the data across the processes). Alternatively, one could just use a regular python list to store the data and use the builtin bisect for fast lookup of dates. Those are two approaches I came up with and have code for I can reuse to quickly implement this. Open to any other suggestions on how to do this, or to discuss if this is needed in the main repository at all.

  • (Please see next discussion point first, as this one could become redundant.) As you can see in the smaller commit, I would like to make Microgrid's representation for pickling implicit using __get_state__.

    I wasted quite some hours wondering why this queue-based implementation does not work, noticing way too late it was because I called pickle.dumps(microgrid) instead of microgrid.pickle().
    This even lead me to give up on this attempt and trying out sqlite for this purpose instead. When I run into the same issue with sqlite, I noticed that the multiprocessing.Queue approach actually worked perfectly.
    Except for me accidentally trying to pickle the queues themselves, because they were included in the pickled representation of the microgrid.

    I do not really see the point in having a separate public method pickle() instead of using the __get_state__ mechanism. Is there a reasoning behind that?

  • A typical example of a Microgrid consumes a lot of memory for its pickled representation. This becomes a problem if one wants to provide access to past values.
    It looks like this is because a Generator's HistoricalSignal is also included in the pickled representation. It seems like actors should also be removed from the pickled representation.
    But actually, the question becomes if we need to pass the whole microgrid to the Broker anyways? The API cannot modify it directly, and needs to use the set_event mechanism for modification regardless. The only meaningful data that can be queried from the Microgrid object directly is the trivial step_size, and Storage + StoragePolicy objects. Both of these already have state methods akin to Actors. I would actually be in favour of removing the pickling ability of Microgrid and extracting this relevant information manually to be added to the queue.

    Memory footprint example here:

    Let's take the following simple setup:
    ```python3
    environment.add_microgrid(
        actors=[
            ComputingSystem(power_meters=[MockPowerMeter(name="mock", p=50)]),
            Generator(signal=HistoricalSignal.from_dataset("solcast2022_global"), column="Berlin"),
        ],
        storage=SimpleBattery(capacity=100),
        controllers=[sil_controller],
        step_size=10,
    )
    ```
    If we take `state` to be the microgrid's `__dict__` attribute, as used by pickle:
    ```python3
    >>> print("microgrid state pickledsize:", sys.getsizeof(pickle.dumps(state)))
    microgrid state pickledsize: 27101093
    >>> print("microgrid components pickledsize:", {k: f"{sys.getsizeof(pickle.dumps(v)) + sys.getsizeof(pickle.dumps(k))}b" for k, v in state.items()})
    microgrid components pickledsize: {'actors': '27100991b', 'controllers': '97b', 'storage': '195b', 'storage_policy': '99b', 'step_size': '95b'}
    >>> print("microgrid actors pickledsize:", {a.name: f"{sys.getsizeof(pickle.dumps(a))}b" for a in state["actors"]})
    microgrid actors pickledsize: {'ComputingSystem-0': '218b', 'Generator-0': '27100788b'}
    ```
    </details>
    
  • _process_incoming_data needs to run periodically from within the fastapi process. Right now it is only called on demand, when the broker is asked for data through e.g. get_p_delta.
    I am unsure how to make the fastapi-process call this regularily. I guess one could use a Thread spawned by that process, but perhaps there are some better ideas? Maybe uvicorn/fastapi even provide mechanisms for this. I found this, but it is in an extra library.
    Not calling this periodically can lead to the following problems:

    1. Many updates from _incoming_data_queue have to be processed by the broker at once, slowing down the API response.
    2. If the broker is never asked to provide this data, and thus the data in the queue is never processed, the queue will eventually run out of space (all multiprocessing.Queue have a max capacity that is limited through the operating system).

@marvin-steinke marvin-steinke self-assigned this Apr 3, 2024
@birnbaum
Copy link
Collaborator

birnbaum commented Apr 4, 2024

Hi, thanks a lot for the PR and sorry for getting back late! The multiprocessing queue implementation looks great!

  • What do you need past p_delta values for in the Broker? Just to understand the use case so we can think about how to do it right
  • Fully agree with the __get_state__() implementation
  • Passing the entire Microgrid was just the easiest thing to do for the first prototype, I agree that it has several drawbacks. Feel free to modify the code to just pass what you actually need, I think we'll need to iterate to see that makes the most sense here :)
  • Makes sense to me! I think either is fine, implementing it yourself using a Thread or using fastapi_utils. Maybe using the library is easier as it's less code we have to write ourselves :)

I'll merge this directly, feel free to issue new PRs for the other topics! Thanks :)

@birnbaum birnbaum marked this pull request as ready for review April 4, 2024 10:39
@birnbaum birnbaum self-requested a review April 4, 2024 10:39
@birnbaum birnbaum merged commit 8ea2035 into dos-group:main Apr 4, 2024
2 checks passed
@marvin-steinke
Copy link
Contributor

marvin-steinke commented Apr 4, 2024

Many updates from _incoming_data_queue have to be processed by the broker at once, slowing down the API response.

Do you think this problem will persist if the Microgrid was smaller in size? The queue should be performant enough to handle the amount of requests. We should also think about using pipes which are faster than queues by a fair bit.

If the broker is never asked to provide this data, and thus the data in the queue is never processed, the queue will eventually run out of space (all multiprocessing.Queue have a max capacity that is limited through the operating system).

We could simply empty the queue each time we read the most recent values, right?

@Impelon
Copy link
Contributor Author

Impelon commented Apr 4, 2024

What do you need past p_delta values for in the Broker? Just to understand the use case so we can think about how to do it right

I would like to provide an API that provides a time-series of e.g. battery state, local energy generation, etc. It does not have to have a super large time window for retaining these values, but continuously polling the API for new values of the e.g. battery does not seem sensible to me.
This does not necessarily have to be a feature of the Broker itself, but I think this is probably a reasonable/common use case.

Feel free to modify the code to just pass what you actually need, I think we'll need to iterate to see that makes the most sense here :)

Sounds reasonable, I will make a proposal!

implementing it yourself using a Thread or using fastapi_utils. Maybe using the library is easier as it's less code we have to write ourselves :)

Well, using a Thread it's probably just 3-6 lines of code, and threads/processes are using in the code already anyways. But I will think about ways that seem easier.

Many updates from _incoming_data_queue have to be processed by the broker at once, slowing down the API response.

Do you think this problem will persist if the Microgrid was smaller in size? The queue should be performant enough to handle the amount of requests.

To be fair, I have not tested this - running a load test to see whether this actually is a bottleneck - it is just a theoretical concern I had. I'm guessing for the use cases one would use vessim for, it is performant enough, even with the current size of the pickled Microgrid.

We should also think about using pipes which are faster than queues by a fair bit.

I had not considered this. But in theory it would work, since we only have 2 communicating processes. Under modern python versions, a pipe is still faster, but not by a lot apparently.
However, I think the main issue is that pipes are even more limited in capacity than queues (64kB on modern Linux), and sending any data after that would block. Using pipes would only work, if the API-process is always listening to the pipe, otherwise it would be a bottleneck, if I understand the situation correctly. I'm not sure the extra performance one would get would be worth the extra code one would need to write.

If the broker is never asked to provide this data, and thus the data in the queue is never processed, the queue will eventually run out of space (all multiprocessing.Queue have a max capacity that is limited through the operating system).

We could simply empty the queue each time we read the most recent values, right?

Right, but I think I misunderstand the question? The queue is already fully emptied each time any of get_p_delta/get_actor/get_microgrid is called. My point was, if we just run a vessim simulation that includes a SilController and never receive any API calls, the queue will eventually run out of capacity, crashing the simulation.

vessim/sil.py Show resolved Hide resolved
@marvin-steinke
Copy link
Contributor

Right, but I think I misunderstand the question? The queue is already fully emptied each time any of get_p_delta/get_actor/get_microgrid is called. My point was, if we just run a vessim simulation that includes a SilController and never receive any API calls, the queue will eventually run out of capacity, crashing the simulation.

You're right, how about we empty the queue every time before we write to it?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants