Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[scheduler] Even more resilient perl beanstalkd annotation worker #545

Conversation

akotlar
Copy link
Collaborator

@akotlar akotlar commented Jul 8, 2024

  • This change makes it possible to have multiple annotation workers competing for jobs, even in the presence of network instability, and removes race conditions that could result in double submission and parallel workers writing to the same directory in the case of such instability. This change also makes Python beanstalkd workers more robust to network communication issues, such as application load balancer enforced connection interruptions on idle TCP connections.

  • Enables annotation jobs to run indefinitely, but while also making those jobs' TTR (time to run, see https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.txt) potentially as short as a few seconds. We now use a keep-alive mechanism on job submissions. This makes it possible to retry jobs faster if the worker that picked up the job times out.

  • bystro-annotate.pl and Seq/ packages now consistently output debug messages to STDERR, and you can now pipe the output from bystro-annotate.pl for downstream processing; the output is a JSON message containing the result paths and result summary:

  • Semaphores to make double writes impossible: Seq.pm takes an exclusive lock onbystro_annotation.lock file in the target output directory. If it cannot, annotation fails. This file lock is kept until annotation is completed.

    • Additionally, a bystron_annotation.completed file is written before the exclusive lock is released. Presence of this file is checked before annotation begins (after exclusive lock is acquired), to ensure that we do not accidentally re-attempt processing of an annotation after completion. The reason this is important is that in the case of submissions through the Bystro API server / UI, after the job is completed, the output files are used for automatic ancestry, search, and (potentially) PRS calculations. If an annotation worker re-attempts to write the annotation while indexing/ancestry/prs (and in the future) other workers are processing data, we have introduced a race condition that potentially leads to corruption (the annotation files could be in an incomplete state just before the indexer attempts to read them, or during read they could be modified, for instance).
  • Remove Perl Bystro Annotation server (bystro-server.pl) in favor of calling the bystro-annotate.pl command line program from the Python beanstalkd worker. This is done to consolidate beanstalkd communication code, to improve ease of debugging.

  • python/beanstalkd/worker.py: now runs handler_fn using ProcessPoolExecutor (instead of ThreadPoolExecutor), so as to not conflict with Ray's signal handler, and to make it possible to kill all handler_fn child processes.

  • python/beanstalkd/worker.py: kills the child processes associated with handler_fn if the job touch() fails with NOT_FOUND while the handler_fn child processes are still running. NOT_FOUND indicates that the job is no longer available to the worker for processing: at this point the worker should attempt to stop job processing (whatever is running in handler_fn), and must ensure that any other workers of the same kind that pick up the job would supersede it.

  • python/beanstalkd/worker.py: finer-grained error handling, and will not die on client timeout errors. Instead we sleep for 1s and re-attempt connection, indefinitely (or until the process is killed).

Example of new perl/bin/bystro-annotate.pl output:

(bystro) (base) ubuntu@ip-10-98-135-15:~/bystro/perl$ perl bin/bystro-annotate.pl --in ~/bystro/trio.trim.vep.vcf.gz --out test_tri/test --config ~/bystro/config/hg19.yml | jq
{
  "totalSkipped": 0,
  "totalProgress": 13341,
  "error": null,
  "results": {
    "annotation": "test.annotation.tsv",
    "config": "hg19.yml",
    "log": "test.annotation.log.txt",
    "sampleList": "test.sample_list",
    "dosageMatrixOutPath": "test.dosage.feather",
    "header": "test.annotation.header.json",
    "statistics": {
      "json": "test.statistics.json",
      "qc": "test.statistics.qc.tsv",
      "tab": "test.statistics.tsv"
    }
  }
}

Additional Background and Motivation:

  1. Previously, there was a small chance that operations on jobs like delete, release could stall the worker forever, if the beanstalkd server became unavailable in the microseconds between the job connection establishment and delete/release (we always connect before attempting those, so failure would have to occur between those two operations). Additionally, if the beanstalkd server became unavailable in the millisecond or so while those operations were running, the worker could stall.
  • The Python beanstalkd worker solves this because all operations on the socket respect the socket_timeout, whereas in Perl they do not and require us to implement client-side mechanisms
  1. Perl MCE (Many Cores Engine), at least in our hands, could not launch a 2nd parallel process for touching jobs (would conflict with launching the process pool for annotation), making it necessary to fork the annotation process anyway.

  2. By having the beanstalkd worker run bystro-annotate.pl, rather than calling Seq.pm directly, we ensure our command line interface gets use, and that we discover usability improvements, as we have done in this PR.

  3. Annotation jobs required long TTR (time to run) leases, because the annotation workers would not periodically touch the job to refresh the lease. If the client became unresponsive, say due to network outage, such that the job would not be completed or failed (or communication from the worker to beanstalkd server during delete/release operations failed), the job would only be retried after the TTR lease expired. Currently that lease is 48 hours. With this change jobs can run as long as needed, even with short TTRs, so that retrying the job after unresponsive client happened much faster (we could set the TTR to say 30 minutes).

@akotlar akotlar changed the title [WIP][scheduler] Even more resilient perl beanstalkd worker [WIP][scheduler] Even more resilient perl beanstalkd annotation worker Jul 9, 2024
@akotlar akotlar changed the title [WIP][scheduler] Even more resilient perl beanstalkd annotation worker [scheduler] Even more resilient perl beanstalkd annotation worker Jul 11, 2024
@akotlar akotlar mentioned this pull request Jul 11, 2024
31 tasks
@@ -51,7 +52,7 @@ has config => (
is => 'ro',
isa => 'Str',
coerce => 1,
required => 1,
required => 0,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is no longer required, so that we can defer the check of whether we have config until we've seen whether we have json_config, which may define config.

@@ -41,15 +42,46 @@
logger = logging.getLogger(__name__)


class FunctionWrapper:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Python ProcessPoolExecutor cannot serialize (to send to other processes) functions that are not defined as top-level functions (e.g. defined in this module, or imported from another module); the functions we're executing are instead passed to the listen() function.

To get around that, we use a custom serializer: cloud pickle, which is the serializer used by Ray, for the same purpose.

# Signal handler function
def kill_child_processes(parent_pid=os.getpid(), kill_parent=False):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need this to ensure that the process thathandler_fn is running in, as well as any child processes handler_fn spawns are cleaned up either on program exit, or if touching the job fails with NOT_FOUND (or another exception), because if that happens the present worker no longer owns the job, and therefore the job is free to be picked up by another worker (applies when multiple workers are found; but even with a single worker we can get behavior that appears like multiple workers, because event message lifetimes are decoupled from submission message lifetimes).

@akotlar
Copy link
Collaborator Author

akotlar commented Jul 15, 2024

Since the main comment is busy:

  1. I've tested this extensively on bystro-dev, under a variety of workloads, and doing my best to break it, by using iptables to drop packets, restarting beanstalkd during submissions, using conntrack -D -p tcp --dport 11300 --timeout N to drop connections older than N seconds, etc. All works as expected. Workers, if they ever find themselves concurrently processing an annotation, which is only possible under issues of severe network fuckery, will result in the losing worker failing the job. It should also be possible under odd circumstances for the job to complete, but in either case, we will never get corruption from concurrent writes (unless FLOCK doesn't work on the filesystem). There is room for improvement here, but for now this is good enough for what should be an extreme corner case, and practically impossible with sufficiently long TTR
  • the only case we can have 2 workers try to simultaneously process a job is that the network goes down for long enough for the TTR lease of worker 1 to expire, causing worker 2 to pick up the job; it will take a short time for worker 1 to realize it no longer has the lease, and to kill the child task(s) that are doing the work requested in the job message.
  • currently worker 2 will see that the job is being processed, and fail the job
  • if worker 1 completed the job and sent the completion message before it realized it no longer had the lease, worker 2 would fail the job because it found the bystron_annotation.completed file. in the future we can relax this behavior, after verifying that all of the expected files are present (we can write the list of completed files, and their hashes in to bystron_annotation.completed, and have worker 2 verify that the files are present, have the expected contents, and send back a completed job with a qualification (e.g., put Duplicate submission, results verified and not modified in the message log for the job's BystroJobs DB record ).
  1. This is live on bystro-dev, with a very aggressive 40s TTR (to increase chance of race conditions for stress testing). Please try to submit jobs and break things.

perl/lib/Seq.pm Outdated
unlink($lockPath);
}
catch {
$self->log( 'warn', "Failed to close and delete lock file: $_" );
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change to STDERR

Copy link
Collaborator

@cristinaetrv cristinaetrv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Walked through the overall process changes in miro (https://miro.com/app/board/uXjVKzUcjLM=/?share_link_id=838501974882), recommended to add workflow to documentation in the future.
Overall, lgtm!

@akotlar akotlar merged commit bdd3144 into bystrogenomics:master Jul 15, 2024
8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants