-
Notifications
You must be signed in to change notification settings - Fork 2.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
15692: Introduce background jobs #16927
Merged
jeremystretch
merged 38 commits into
netbox-community:feature
from
alehaa:15692-background-jobs
Jul 30, 2024
Merged
Changes from all commits
Commits
Show all changes
38 commits
Select commit
Hold shift + click to select a range
5fab8e4
Introduce reusable BackgroundJob framework
alehaa 957bc3d
Restore using import_string for jobs
alehaa db591d4
Use SyncDataSourceJob for management command
alehaa 53a4420
Implement BackgroundJob for running scripts
alehaa 7fb1875
Fix documentation of model features
alehaa 212262d
Ensure consitent code style
alehaa 9dc6099
Introduce reusable ScheduledJob
alehaa 4880d81
Introduce reusable SystemJob
alehaa d78ddfc
Add documentation for jobs framework
alehaa fd8d537
Merge branch 'feature' into 15692-background-jobs
alehaa 15f888c
Revert "Use SyncDataSourceJob for management"
alehaa 7d15ec0
Merge enqueued status into JobStatusChoices
alehaa 9f1989c
Fix logger for ScriptJob
alehaa 257976d
Remove job name for scripts
alehaa 58089c7
Merge ScheduledJob into BackgroundJob
alehaa fb75389
Add name attribute for BackgroundJob
alehaa 654e6e7
Drop enqueue_sync_job() method from DataSource
jeremystretch 62380fb
Import ScriptJob directly
jeremystretch d6432fb
Relax requirement for Jobs to reference a specific object
jeremystretch b3f122a
Rename 'run_now' arg on Job.enqueue() to 'immediate'
jeremystretch 3e1cc1b
Merge branch '15692-cherry' into 15692-background-jobs
alehaa bcad8cf
Fix queue lookup in Job enqueue
alehaa 0b15ecf
Collapse SystemJob into BackgroundJob
alehaa 309ad29
Remove legacy JobResultStatusChoices
alehaa b17b205
Use queue 'low' for system jobs by default
alehaa 60e4e81
Add test cases for BackgroundJob handling
alehaa bd4a21c
Fix enqueue interval jobs
alehaa 4c2ba09
Honor schedule_at for job's enqueue_once
alehaa c047bf4
Switch BackgroundJob to regular methods
alehaa e65e87c
Fix background tasks documentation
alehaa 3fc3d37
Test enqueue in combination with enqueue_once
alehaa cecc2b8
Rename background jobs to tasks (to differentiate from RQ)
jeremystretch c098d1c
Touch up docs
jeremystretch b9cf078
Revert "Use queue 'low' for system jobs by default"
alehaa 32ebe7b
Remove system background job
alehaa ecf8e79
Fix runscript management command
alehaa 7f0a4e3
Use regular imports for ScriptJob
alehaa 85b9f65
Rename BackgroundJob to JobRunner
jeremystretch File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
# Background Jobs | ||
|
||
NetBox plugins can defer certain operations by enqueuing [background jobs](../../features/background-jobs.md), which are executed asynchronously by background workers. This is helpful for decoupling long-running processes from the user-facing request-response cycle. | ||
|
||
For example, your plugin might need to fetch data from a remote system. Depending on the amount of data and the responsiveness of the remote server, this could take a few minutes. Deferring this task to a queued job ensures that it can be completed in the background, without interrupting the user. The data it fetches can be made available once the job has completed. | ||
|
||
## Job Runners | ||
|
||
A background job implements a basic [Job](../../models/core/job.md) executor for all kinds of tasks. It has logic implemented to handle the management of the associated job object, rescheduling of periodic jobs in the given interval and error handling. Adding custom jobs is done by subclassing NetBox's `JobRunner` class. | ||
|
||
::: utilities.jobs.JobRunner | ||
|
||
#### Example | ||
|
||
```python title="jobs.py" | ||
from utilities.jobs import JobRunner | ||
|
||
|
||
class MyTestJob(JobRunner): | ||
class Meta: | ||
name = "My Test Job" | ||
|
||
def run(self, *args, **kwargs): | ||
obj = self.job.object | ||
# your logic goes here | ||
``` | ||
|
||
You can schedule the background job from within your code (e.g. from a model's `save()` method or a view) by calling `MyTestJob.enqueue()`. This method passes through all arguments to `Job.enqueue()`. However, no `name` argument must be passed, as the background job name will be used instead. | ||
|
||
### Attributes | ||
|
||
`JobRunner` attributes are defined under a class named `Meta` within the job. These are optional, but encouraged. | ||
|
||
#### `name` | ||
|
||
This is the human-friendly names of your background job. If omitted, the class name will be used. | ||
|
||
### Scheduled Jobs | ||
|
||
As described above, jobs can be scheduled for immediate execution or at any later time using the `enqueue()` method. However, for management purposes, the `enqueue_once()` method allows a job to be scheduled exactly once avoiding duplicates. If a job is already scheduled for a particular instance, a second one won't be scheduled, respecting thread safety. An example use case would be to schedule a periodic task that is bound to an instance in general, but not to any event of that instance (such as updates). The parameters of the `enqueue_once()` method are identical to those of `enqueue()`. | ||
|
||
!!! tip | ||
It is not forbidden to `enqueue()` additional jobs while an interval schedule is active. An example use of this would be to schedule a periodic daily synchronization, but also trigger additional synchronizations on demand when the user presses a button. | ||
|
||
#### Example | ||
|
||
```python title="jobs.py" | ||
from utilities.jobs import JobRunner | ||
|
||
|
||
class MyHousekeepingJob(JobRunner): | ||
class Meta: | ||
name = "Housekeeping" | ||
|
||
def run(self, *args, **kwargs): | ||
# your logic goes here | ||
``` | ||
|
||
```python title="__init__.py" | ||
from netbox.plugins import PluginConfig | ||
|
||
class MyPluginConfig(PluginConfig): | ||
def ready(self): | ||
from .jobs import MyHousekeepingJob | ||
MyHousekeepingJob.setup(interval=60) | ||
``` | ||
|
||
## Task queues | ||
|
||
Three task queues of differing priority are defined by default: | ||
|
||
* High | ||
* Default | ||
* Low | ||
|
||
Any tasks in the "high" queue are completed before the default queue is checked, and any tasks in the default queue are completed before those in the "low" queue. | ||
|
||
Plugins can also add custom queues for their own needs by setting the `queues` attribute under the PluginConfig class. An example is included below: | ||
|
||
```python | ||
class MyPluginConfig(PluginConfig): | ||
name = 'myplugin' | ||
... | ||
queues = [ | ||
'foo', | ||
'bar', | ||
] | ||
``` | ||
|
||
The `PluginConfig` above creates two custom queues with the following names `my_plugin.foo` and `my_plugin.bar`. (The plugin's name is prepended to each queue to avoid conflicts between plugins.) | ||
|
||
!!! warning "Configuring the RQ worker process" | ||
By default, NetBox's RQ worker process only services the high, default, and low queues. Plugins which introduce custom queues should advise users to either reconfigure the default worker, or run a dedicated worker specifying the necessary queues. For example: | ||
|
||
``` | ||
python manage.py rqworker my_plugin.foo my_plugin.bar | ||
``` |
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,33 +1,33 @@ | ||
import logging | ||
|
||
from netbox.search.backends import search_backend | ||
from .choices import * | ||
from utilities.jobs import JobRunner | ||
from .choices import DataSourceStatusChoices | ||
from .exceptions import SyncError | ||
from .models import DataSource | ||
from rq.timeouts import JobTimeoutException | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
def sync_datasource(job, *args, **kwargs): | ||
class SyncDataSourceJob(JobRunner): | ||
""" | ||
Call sync() on a DataSource. | ||
""" | ||
datasource = DataSource.objects.get(pk=job.object_id) | ||
|
||
try: | ||
job.start() | ||
datasource.sync() | ||
class Meta: | ||
name = 'Synchronization' | ||
|
||
# Update the search cache for DataFiles belonging to this source | ||
search_backend.cache(datasource.datafiles.iterator()) | ||
def run(self, *args, **kwargs): | ||
datasource = DataSource.objects.get(pk=self.job.object_id) | ||
|
||
job.terminate() | ||
try: | ||
datasource.sync() | ||
|
||
except Exception as e: | ||
job.terminate(status=JobStatusChoices.STATUS_ERRORED, error=repr(e)) | ||
DataSource.objects.filter(pk=datasource.pk).update(status=DataSourceStatusChoices.FAILED) | ||
if type(e) in (SyncError, JobTimeoutException): | ||
logging.error(e) | ||
else: | ||
# Update the search cache for DataFiles belonging to this source | ||
search_backend.cache(datasource.datafiles.iterator()) | ||
|
||
except Exception as e: | ||
DataSource.objects.filter(pk=datasource.pk).update(status=DataSourceStatusChoices.FAILED) | ||
if type(e) is SyncError: | ||
logging.error(e) | ||
raise e |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
import django.db.models.deletion | ||
from django.db import migrations, models | ||
|
||
|
||
class Migration(migrations.Migration): | ||
|
||
dependencies = [ | ||
('contenttypes', '0002_remove_content_type_name'), | ||
('core', '0011_move_objectchange'), | ||
] | ||
|
||
operations = [ | ||
migrations.AlterField( | ||
model_name='job', | ||
name='object_type', | ||
field=models.ForeignKey( | ||
blank=True, | ||
null=True, | ||
on_delete=django.db.models.deletion.CASCADE, | ||
related_name='jobs', | ||
to='contenttypes.contenttype' | ||
), | ||
), | ||
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,6 +31,8 @@ class Job(models.Model): | |
to='contenttypes.ContentType', | ||
related_name='jobs', | ||
on_delete=models.CASCADE, | ||
blank=True, | ||
null=True | ||
) | ||
object_id = models.PositiveBigIntegerField( | ||
blank=True, | ||
|
@@ -197,25 +199,34 @@ def terminate(self, status=JobStatusChoices.STATUS_COMPLETED, error=None): | |
job_end.send(self) | ||
|
||
@classmethod | ||
def enqueue(cls, func, instance, name='', user=None, schedule_at=None, interval=None, **kwargs): | ||
def enqueue(cls, func, instance=None, name='', user=None, schedule_at=None, interval=None, immediate=False, **kwargs): | ||
alehaa marked this conversation as resolved.
Show resolved
Hide resolved
|
||
""" | ||
Create a Job instance and enqueue a job using the given callable | ||
|
||
Args: | ||
func: The callable object to be enqueued for execution | ||
instance: The NetBox object to which this job pertains | ||
instance: The NetBox object to which this job pertains (optional) | ||
name: Name for the job (optional) | ||
user: The user responsible for running the job | ||
schedule_at: Schedule the job to be executed at the passed date and time | ||
interval: Recurrence interval (in minutes) | ||
immediate: Run the job immediately without scheduling it in the background. Should be used for interactive | ||
management commands only. | ||
""" | ||
object_type = ObjectType.objects.get_for_model(instance, for_concrete_model=False) | ||
rq_queue_name = get_queue_for_model(object_type.model) | ||
if schedule_at and immediate: | ||
raise ValueError("enqueue() cannot be called with values for both schedule_at and immediate.") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should be internationalized with _( |
||
|
||
if instance: | ||
object_type = ObjectType.objects.get_for_model(instance, for_concrete_model=False) | ||
object_id = instance.pk | ||
else: | ||
object_type = object_id = None | ||
rq_queue_name = get_queue_for_model(object_type.model if object_type else None) | ||
queue = django_rq.get_queue(rq_queue_name) | ||
status = JobStatusChoices.STATUS_SCHEDULED if schedule_at else JobStatusChoices.STATUS_PENDING | ||
job = Job.objects.create( | ||
object_type=object_type, | ||
object_id=instance.pk, | ||
object_id=object_id, | ||
name=name, | ||
status=status, | ||
scheduled=schedule_at, | ||
|
@@ -224,8 +235,16 @@ def enqueue(cls, func, instance, name='', user=None, schedule_at=None, interval= | |
job_id=uuid.uuid4() | ||
) | ||
|
||
if schedule_at: | ||
# Run the job immediately, rather than enqueuing it as a background task. Note that this is a synchronous | ||
# (blocking) operation, and execution will pause until the job completes. | ||
if immediate: | ||
func(job_id=str(job.job_id), job=job, **kwargs) | ||
|
||
# Schedule the job to run at a specific date & time. | ||
elif schedule_at: | ||
queue.enqueue_at(schedule_at, func, job_id=str(job.job_id), job=job, **kwargs) | ||
|
||
# Schedule the job to run asynchronously at this first available opportunity. | ||
else: | ||
queue.enqueue(func, job_id=str(job.job_id), job=job, **kwargs) | ||
|
||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might be good to give an example here of calling enqueue