Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ NEW: Add ProcessLauncher.process_cache #213

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

chrisjsewell
Copy link
Member

@chrisjsewell chrisjsewell commented Mar 2, 2021

Currently, in aiida-core for a daemon runner:

  1. there is no easy way to retrieve the processes running on it (except to search gc.get_objects())
  2. running the same process twice is not identified until you hit a DuplicateSubscriberIdentifier exception (see Issues with new asyncio daemon (stress-test) aiida-core#4595)

So in this PR I propose to add a cache to the ProcessLauncher that keeps a (weak) reference of the processes it has launched, and raises a DuplicateProcess exception when trying to launch/continue a process that is already running.

I envision potentially also using this in two places:

  1. In 👌 IMPROVE: verdi daemon status aiida-core#4769, where I am playing around with returning the number of processes running on each daemon, when calling verdi daemon status
  2. Together with ✨ NEW: Expose aio_pika.Connection.add_close_callback kiwipy#104, to create a callback that stops all processes currently running when the connection to RMQ is lost. Maybe something like:
def close_callback(sender, exc):
	processes = process_launcher.process_cache.values()
    # we want to immediately stop the process storing anything more to its node,
    # which could conflict with another worker that starts running it
	for process in processes:
        process._enable_persistence = False
    # we could then do seomthing like kill it (knowing that the kill state will no longer be persisted)
    # although this would also send kill signals to its children
    # which may be running on a different worker, and so we would not want to kill
	for process in processes:
        process.kill()  

@codecov
Copy link

codecov bot commented Mar 2, 2021

Codecov Report

Merging #213 (69494c7) into develop (77e3820) will decrease coverage by 0.01%.
The diff coverage is 90.91%.

Impacted file tree graph

@@             Coverage Diff             @@
##           develop     #213      +/-   ##
===========================================
- Coverage    90.46%   90.45%   -0.00%     
===========================================
  Files           22       22              
  Lines         2976     2994      +18     
===========================================
+ Hits          2692     2708      +16     
- Misses         284      286       +2     
Impacted Files Coverage Δ
plumpy/process_comms.py 86.23% <90.48%> (+0.17%) ⬆️
plumpy/exceptions.py 100.00% <100.00%> (ø)

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 77e3820...69494c7. Read the comment docs.

muhrin
muhrin previously approved these changes Mar 3, 2021
Copy link
Collaborator

@muhrin muhrin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like a good move. From my side I'd be happy for this to go in.

@chrisjsewell
Copy link
Member Author

thanks, I'll wait a bit to merge in case @sphuber wants to comment?

Copy link
Collaborator

@sphuber sphuber left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @chrisjsewell . I think it is definitely useful to have this cache, but I have some question about the implementation. One I added in the code and the second comment I have is what happens when a process terminates. Shouldn't it be properly removed from the cache? Also, there is a risk of the cache going out of sync if a task gets sent back. At this point the cache should also be updated because otherwise if it gets reloaded it would unjustly hit the DuplicateProcess exception.

Comment on lines +591 to +594
proc: Process = proc_class(*init_args, **init_kwargs)

if proc.pid in self._process_cache and not self._process_cache[proc.pid].has_terminated():
raise exceptions.DuplicateProcess(f'Process<{proc.pid}> is already running')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we actually use the "launch" part of the communicator but exclusively the "continue" so I don't think this will matter for our use-case, but isn't it a bit weird to be able to hit a duplicate process when launching it? When you launch it, it is the first time you are creating it and so the pid shouldn't already running as it can when continuing an existing process. I can see how there can still be a clash in generated process ids, but I think that the exception type or at the very least the message should be different from the one in _continue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeh as you mention, we don't use it, so this is only for completeness. I would say though, whether you continue or launch, if there are two processes with the same PID they are duplicates, so I disagree that the message should be any difference.
(if you launch twice with the same PID, this is no different to continuing twice)

Copy link
Collaborator

@sphuber sphuber Mar 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not quite sure I agree. I agree that the result of having a duplicate PID is the same, however, the origin would be very different and I think that is important to reflect. When you launch a new PID is created and so it should be unique. If that is not the case, then the ID generating algorithm is fundamentally broken, which is completely different from the case in continue where can simply have requested to continue the same process twice.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to have the lat word lol

however, the origin would be very different and I think that is important to reflect

but then you could see the origin in the traceback

When you launch a new PID is created

this would not be the case if you specifically set the pid in init_args or init_kwargs

@chrisjsewell
Copy link
Member Author

what happens when a process terminates. Shouldn't it be properly removed from the cache?

nope, thats the point of using a WeakValueDictionary, once the process terminates it will no longer be referenced (which we have in principle ensured in aiidateam/aiida-core#4603) and so will be applicable for garbage collection. Once the process is garbage collected, it will simply disappear from the cache.
As noted in the process_cache docstring, the cache "guarantees" to contain all running processes, but it may still contain processes that have been terminated but not yet garbage collected. This is why in the code we don;t just check whether the process is in the cache before raising a DuplicateProcess, but also whether it is in a terminal state.

@chrisjsewell
Copy link
Member Author

chrisjsewell commented Mar 4, 2021

Note I don't think there is any way to always know exactly when a process should be removed from the cache, given that _continue can be called with nowait.
But perhaps there is a more robust way than checking for a terminal state?

@chrisjsewell
Copy link
Member Author

@sphuber how about 2b6ef68, do you think that is sufficient?

sphuber
sphuber previously approved these changes Mar 5, 2021
Copy link
Collaborator

@sphuber sphuber left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @chrisjsewell with that addition this is fine for me to be merged. Note that I still don't agree on the same exception behavior being used in _launch and _continue as I describe in the comment thread, but it is not critical for now, so let's leave it.

Then to comment on your OP on how to stop running processes once a runner looses its connection. I think we might have to add a STOPPED state to the state machine that can be reached by calling stop() on a process that will then stop all mutations of state. As you said, kill() is not what we are trying to do and so hacking around the undesirable consequences seems the wrong way to go about it.

@sphuber
Copy link
Collaborator

sphuber commented Mar 5, 2021

Actually, hold on with merging. Can I retract my approval?

Edit: I am now wondering about the correctness of the not self._process_cache[proc.pid].has_terminated() clause in the conditional to throw the DuplicateProcess exception. Doesn't this allow the situation where process A is run by runner A, which then successfully runs it to completion, but somehow the runner receives the task to continue it again, and instead of excepting it will run the process again. Won't this be a problem? Or in the case of continue, it will load the state from the serialized checkpoint on the code which should then already have been in a terminal state and it will except there. At best that exception simply won't do anything as the node is already sealed, but it is kind of weird. Shouldn't we always just raise DuplicateProcess even if the process is already terminated. We don't care if it is still in the cache simply because it hasn't been garbage collected yet. A duplicate process id is a problem and should not be respected by running. What am I missing?

@sphuber sphuber self-requested a review March 5, 2021 09:04
@chrisjsewell
Copy link
Member Author

Actually, hold on with merging. Can I retract my approval?

nope to late 😛 whats the issue?

@chrisjsewell
Copy link
Member Author

chrisjsewell commented Mar 5, 2021

Hmm fair, ok lets game this out:

So RMQ loses the connection and now still has a "process continue" task to fulfil, and it tries to give that task to the same daemon worker that was already running it (and has been reconnected).

  • If that process finalised/terminated in the meantime, we want to "inform" RMQ of that, so it can clear the task. Perhaps in this case it should not raise an exception at all, and just return True or something, to clear the task gracefully? (maybe just with a logger warning that this has happened)
  • If the process is just running normally, then ideally we probably want to call something like a Process.stop() (as you suggest) and reject the task, so it can be re-queued
  • If we are able to implement the close_callback and "stop" all current Processes on a reconnection, the current Process may still be in a stage of stopping, or stopped but not yet removed from the cache. Here we might be happy to let the "duplicate" process commence, or we need to re-queue.

So perhaps we want to try implementing Process.stop before merging this, so we know what we are working with?

@chrisjsewell
Copy link
Member Author

chrisjsewell commented Mar 5, 2021

If that process finalised/terminated in the meantime, we want to "inform" RMQ of that ...

In fact, in the aiida.ProcessLauncher._continue this is already done (by checking the node), so this is not really a problem

@ltalirz
Copy link
Member

ltalirz commented Mar 8, 2021

@sphuber Could you please comment on Chris's points so that we can move forward and get this merged?
This would allow for a proper fix of aiidateam/aiida-core#4648 that's delaying the 1.6 release (see also aiidateam/aiida-core#4648) .

@chrisjsewell
Copy link
Member Author

chrisjsewell commented Mar 8, 2021

So perhaps we want to try implementing Process.stop before merging this, so we know what we are working with?

note I am now working on this

as @ltalirz alludes to, using the process cache together with a Process.stop method (or something similar) would also be better to use for aiida-core's shutdown_runner method, rather than the current "brute-force" method of cancelling all async tasks.

@sphuber
Copy link
Collaborator

sphuber commented Mar 8, 2021

Hmm fair, ok lets game this out:

So RMQ loses the connection and now still has a "process continue" task to fulfil, and it tries to give that task to the same daemon worker that was already running it (and has been reconnected).

Note, it might also give it to another another daemon worker if more than one are active.

If that process finalised/terminated in the meantime, we want to "inform" RMQ of that, so it can clear the task. Perhaps in this case it should not raise an exception at all, and just return True or something, to clear the task gracefully? (maybe just with a logger warning that this has happened)

This is already happening in ProcessLauncher._continue.

If the process is just running normally, then ideally we probably want to call something like a Process.stop() (as you suggest) and reject the task, so it can be re-queued

Ideally, if a worker receives a task that it is already running, it would simply continue running the process and acknowledge the task once the process is done. This way there is no loss of work already been done since the last checkpoint of the process. However, I am not sure if this is technically possible or might come with some potential for bugs in edge cases. Maybe the safest is indeed to stop the corresponding process and then start running from the last checkpoint. Is there a reason why it has to be rejected and re-queued and the runner cannot simply continue to run it after having stopped it first and removed from the cache?

If we are able to implement the close_callback and "stop" all current Processes on a reconnection, the current Process may still be in a stage of stopping, or stopped but not yet removed from the cache. Here we might be happy to let the "duplicate" process commence, or we need to re-queue.

Can't we simply let the daemon runner except? Circus will then relaunch a new one and we would be sure that all tasks were properly requeued and there won't be possibilities for subtle inconsistencies with the process cache.

So perhaps we want to try implementing Process.stop before merging this, so we know what we are working with?

It looks like we would be needing this anyway, so that would be great yeah.

@sphuber sphuber changed the base branch from develop to master April 8, 2022 17:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants