Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

15692: Introduce background jobs #16927

Merged
merged 38 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
5fab8e4
Introduce reusable BackgroundJob framework
alehaa Jun 17, 2024
957bc3d
Restore using import_string for jobs
alehaa Jun 20, 2024
db591d4
Use SyncDataSourceJob for management command
alehaa Jun 20, 2024
53a4420
Implement BackgroundJob for running scripts
alehaa Jun 21, 2024
7fb1875
Fix documentation of model features
alehaa Jun 30, 2024
212262d
Ensure consitent code style
alehaa Jun 30, 2024
9dc6099
Introduce reusable ScheduledJob
alehaa Jul 1, 2024
4880d81
Introduce reusable SystemJob
alehaa Jul 1, 2024
d78ddfc
Add documentation for jobs framework
alehaa Jul 1, 2024
fd8d537
Merge branch 'feature' into 15692-background-jobs
alehaa Jul 16, 2024
15f888c
Revert "Use SyncDataSourceJob for management"
alehaa Jul 24, 2024
7d15ec0
Merge enqueued status into JobStatusChoices
alehaa Jul 24, 2024
9f1989c
Fix logger for ScriptJob
alehaa Jul 24, 2024
257976d
Remove job name for scripts
alehaa Jul 24, 2024
58089c7
Merge ScheduledJob into BackgroundJob
alehaa Jul 24, 2024
fb75389
Add name attribute for BackgroundJob
alehaa Jul 24, 2024
654e6e7
Drop enqueue_sync_job() method from DataSource
jeremystretch Jul 24, 2024
62380fb
Import ScriptJob directly
jeremystretch Jul 24, 2024
d6432fb
Relax requirement for Jobs to reference a specific object
jeremystretch Jul 24, 2024
b3f122a
Rename 'run_now' arg on Job.enqueue() to 'immediate'
jeremystretch Jul 24, 2024
3e1cc1b
Merge branch '15692-cherry' into 15692-background-jobs
alehaa Jul 25, 2024
bcad8cf
Fix queue lookup in Job enqueue
alehaa Jul 25, 2024
0b15ecf
Collapse SystemJob into BackgroundJob
alehaa Jul 25, 2024
309ad29
Remove legacy JobResultStatusChoices
alehaa Jul 25, 2024
b17b205
Use queue 'low' for system jobs by default
alehaa Jul 25, 2024
60e4e81
Add test cases for BackgroundJob handling
alehaa Jul 25, 2024
bd4a21c
Fix enqueue interval jobs
alehaa Jul 25, 2024
4c2ba09
Honor schedule_at for job's enqueue_once
alehaa Jul 25, 2024
c047bf4
Switch BackgroundJob to regular methods
alehaa Jul 25, 2024
e65e87c
Fix background tasks documentation
alehaa Jul 25, 2024
3fc3d37
Test enqueue in combination with enqueue_once
alehaa Jul 25, 2024
cecc2b8
Rename background jobs to tasks (to differentiate from RQ)
jeremystretch Jul 29, 2024
c098d1c
Touch up docs
jeremystretch Jul 29, 2024
b9cf078
Revert "Use queue 'low' for system jobs by default"
alehaa Jul 29, 2024
32ebe7b
Remove system background job
alehaa Jul 30, 2024
ecf8e79
Fix runscript management command
alehaa Jul 30, 2024
7f0a4e3
Use regular imports for ScriptJob
alehaa Jul 30, 2024
85b9f65
Rename BackgroundJob to JobRunner
jeremystretch Jul 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/development/models.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Depending on its classification, each NetBox model may support various features
| [Custom links](../customization/custom-links.md) | `CustomLinksMixin` | `custom_links` | These models support the assignment of custom links |
| [Custom validation](../customization/custom-validation.md) | `CustomValidationMixin` | - | Supports the enforcement of custom validation rules |
| [Export templates](../customization/export-templates.md) | `ExportTemplatesMixin` | `export_templates` | Users can create custom export templates for these models |
| [Job results](../features/background-jobs.md) | `JobsMixin` | `jobs` | Users can create custom export templates for these models |
| [Job results](../features/background-jobs.md) | `JobsMixin` | `jobs` | Background jobs can be scheduled for these models |
| [Journaling](../features/journaling.md) | `JournalingMixin` | `journaling` | These models support persistent historical commentary |
| [Synchronized data](../integrations/synchronized-data.md) | `SyncedDataMixin` | `synced_data` | Certain model data can be automatically synchronized from a remote data source |
| [Tagging](../models/extras/tag.md) | `TagsMixin` | `tags` | The models can be tagged with user-defined tags |
Expand Down
72 changes: 71 additions & 1 deletion docs/plugins/development/background-tasks.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,76 @@
# Background Tasks

NetBox supports the queuing of tasks that need to be performed in the background, decoupled from the request-response cycle, using the [Python RQ](https://python-rq.org/) library. Three task queues of differing priority are defined by default:
NetBox supports the queuing of tasks that need to be performed in the background, decoupled from the request-response cycle.


## High level API

NetBox provides an easy-to-use interface for programming and managing different types of jobs. In general, there are different types of jobs that can be used to perform any kind of background task. Due to inheritance, the general job logic remains the same, but each of them fulfills a specific task and has its own management logic around it.

### Background Job

A background job implements a basic [Job](../../models/core/job.md) executor for all kinds of tasks. It has logic implemented to handle the management of the associated job object, rescheduling of periodic jobs in the given interval and error handling. Adding custom jobs is done by subclassing NetBox's `BackgroundJob` class.

**Example:**

```python title="jobs.py"
from utilities.jobs import BackgroundJob

class MyTestJob(BackgroundJob):
@classmethod
def run(cls, job, *args, **kwargs):
obj = job.object
# your logic goes here
```

You can schedule the background job from within your code (e.g. from a model's `save()` method or a view) by calling `MyTestJob.enqueue()`. This method passes through all arguments to `Job.enqueue()`.

::: core.models.Job.enqueue

### Scheduled Job

During execution, a scheduled job behaves like a background job and is therefore implemented in the same way, but must be subclassed from NetBox's `ScheduledJob` class.

However, for management purposes, a `schedule()` method allows a schedule to be set exactly once to avoid duplicates. If a job is already scheduled for a particular instance, a second one won't be scheduled, respecting thread safety. An example use case would be to schedule a periodic task that is bound to an instance in general, but not to any event of that instance (such as updates). The parameters of the `schedule()` method are identical to those of `enqueue()`. Note that this class doesn't allow you to pass the `name` parameter for both methods, but uses a generic name instead.

!!! tip
It is not forbidden to `enqueue()` additional jobs while an interval schedule is active. An example use of this would be to schedule a periodic daily synchronization, but also trigger additional synchronizations on demand when the user presses a button.

### System Job

The last type of job is a system job that is not bound to any particular instance. A typical use case for these jobs is a general synchronization of NetBox objects from another system or housekeeping. The implementation of system jobs is the same as for background and scheduled jobs, but they must be subclassed from NetBox's `SystemJob` class. In addition to avoiding the `name` parameter, no `instance` parameter may be passed to `enqueue()`, as a placeholder will be used instead.

Typically, a system job is set up during NetBox startup when the plugin is loaded. This ensures that the job is running in the background even when no requests are being processed. For this purpose, the `setup()` method can be used to setup a new schedule outside of the request-response cycle. It can be safely called from the plugin's ready function and will register the new schedule right after all plugins are loaded and the database is connected.

**Example:**

```python title="jobs.py"
from utilities.jobs import SystemJob

class MyHousekeepingJob(SystemJob):
@classmethod
def run(cls, *args, **kwargs):
# your logic goes here
pass
```
```python title="__init__.py"
from netbox.plugins import PluginConfig

class MyPluginConfig(PluginConfig):
def ready(self):
from .jobs import MyHousekeepingJob
MyHousekeepingJob.setup(interval=60)
```


## Low Level API

Instead of using the high-level APIs provided by NetBox, plugins may access the task scheduler directly using the [Python RQ](https://python-rq.org/) library. This allows scheduling background tasks without the need to add [Job](../../models/core/job.md) to the database or implementing custom job handling.


## Task queues

Three task queues of differing priority are defined by default:

* High
* Default
Expand Down
1 change: 1 addition & 0 deletions docs/plugins/development/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ project-name/
- __init__.py
- filtersets.py
- graphql.py
- jobs.py
- models.py
- middleware.py
- navigation.py
Expand Down
2 changes: 2 additions & 0 deletions docs/plugins/development/models.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ For more information about database migrations, see the [Django documentation](h

::: netbox.models.features.ExportTemplatesMixin

::: netbox.models.features.JobsMixin

::: netbox.models.features.JournalingMixin

::: netbox.models.features.TagsMixin
Expand Down
30 changes: 14 additions & 16 deletions netbox/core/jobs.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,31 @@
import logging

from netbox.search.backends import search_backend
from .choices import *
from utilities.jobs import BackgroundJob
from .choices import DataSourceStatusChoices
from .exceptions import SyncError
from .models import DataSource
from rq.timeouts import JobTimeoutException

logger = logging.getLogger(__name__)


def sync_datasource(job, *args, **kwargs):
class SyncDataSourceJob(BackgroundJob):
"""
Call sync() on a DataSource.
"""
datasource = DataSource.objects.get(pk=job.object_id)

try:
job.start()
datasource.sync()
@classmethod
def run(cls, job, *args, **kwargs):
datasource = DataSource.objects.get(pk=job.object_id)

# Update the search cache for DataFiles belonging to this source
search_backend.cache(datasource.datafiles.iterator())
try:
datasource.sync()

job.terminate()
# Update the search cache for DataFiles belonging to this source
search_backend.cache(datasource.datafiles.iterator())

except Exception as e:
job.terminate(status=JobStatusChoices.STATUS_ERRORED, error=repr(e))
DataSource.objects.filter(pk=datasource.pk).update(status=DataSourceStatusChoices.FAILED)
if type(e) in (SyncError, JobTimeoutException):
logging.error(e)
else:
except Exception as e:
DataSource.objects.filter(pk=datasource.pk).update(status=DataSourceStatusChoices.FAILED)
if type(e) is SyncError:
logging.error(e)
raise e
13 changes: 6 additions & 7 deletions netbox/core/management/commands/syncdatasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,12 @@ def handle(self, *args, **options):
for i, datasource in enumerate(datasources, start=1):
self.stdout.write(f"[{i}] Syncing {datasource}... ", ending='')
self.stdout.flush()
try:
datasource.sync()
self.stdout.write(datasource.get_status_display())
self.stdout.flush()
except Exception as e:
DataSource.objects.filter(pk=datasource.pk).update(status=DataSourceStatusChoices.FAILED)
raise e

datasource.enqueue_sync_job()
alehaa marked this conversation as resolved.
Show resolved Hide resolved
datasource.refresh_from_db()

self.stdout.write(datasource.get_status_display())
self.stdout.flush()

if len(options['name']) > 1:
self.stdout.write(f"Finished.")
9 changes: 5 additions & 4 deletions netbox/core/models/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def to_objectchange(self, action):

return objectchange

def enqueue_sync_job(self, request):
def enqueue_sync_job(self, request=None):
"""
Enqueue a background job to synchronize the DataSource by calling sync().
"""
Expand All @@ -162,10 +162,11 @@ def enqueue_sync_job(self, request):
DataSource.objects.filter(pk=self.pk).update(status=self.status)

# Enqueue a sync job
return Job.enqueue(
import_string('core.jobs.sync_datasource'),
SyncDataSourceJob = import_string('core.jobs.SyncDataSourceJob')
return SyncDataSourceJob.enqueue(
instance=self,
user=request.user
user=(request.user if request else None),
run_now=(request is None),
)

def get_backend(self):
Expand Down
10 changes: 9 additions & 1 deletion netbox/core/models/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ def terminate(self, status=JobStatusChoices.STATUS_COMPLETED, error=None):
job_end.send(self)

@classmethod
def enqueue(cls, func, instance, name='', user=None, schedule_at=None, interval=None, **kwargs):
def enqueue(cls, func, instance, name='', user=None, schedule_at=None, interval=None, run_now=False, **kwargs):
"""
Create a Job instance and enqueue a job using the given callable

Expand All @@ -208,6 +208,8 @@ def enqueue(cls, func, instance, name='', user=None, schedule_at=None, interval=
user: The user responsible for running the job
schedule_at: Schedule the job to be executed at the passed date and time
interval: Recurrence interval (in minutes)
run_now: Run the job immediately without scheduling it in the background. Should be used for interactive
management commands only.
"""
object_type = ObjectType.objects.get_for_model(instance, for_concrete_model=False)
rq_queue_name = get_queue_for_model(object_type.model)
Expand All @@ -224,6 +226,12 @@ def enqueue(cls, func, instance, name='', user=None, schedule_at=None, interval=
job_id=uuid.uuid4()
)

# Optionally, the job can be run immediately without being scheduled to run in the background.
if run_now:
func(job_id=str(job.job_id), job=job, **kwargs)
return job
jeremystretch marked this conversation as resolved.
Show resolved Hide resolved

# Schedule the job to run asynchronously in the background.
if schedule_at:
queue.enqueue_at(schedule_at, func, job_id=str(job.job_id), job=job, **kwargs)
else:
Expand Down
6 changes: 3 additions & 3 deletions netbox/extras/api/views.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from django.http import Http404
from django.shortcuts import get_object_or_404
from django.utils.module_loading import import_string
from django_rq.queues import get_connection
from rest_framework import status
from rest_framework.decorators import action
Expand All @@ -14,7 +15,6 @@
from core.models import Job, ObjectType
from extras import filtersets
from extras.models import *
from extras.scripts import run_script
from netbox.api.authentication import IsAuthenticatedOrLoginNotRequired
from netbox.api.features import SyncedDataMixin
from netbox.api.metadata import ContentTypeMetadata
Expand Down Expand Up @@ -273,8 +273,8 @@ def post(self, request, pk):
raise RQWorkerNotRunningException()

if input_serializer.is_valid():
Job.enqueue(
run_script,
ScriptJob = import_string("extras.jobs.ScriptJob")
alehaa marked this conversation as resolved.
Show resolved Hide resolved
ScriptJob.enqueue(
instance=script,
name=script.python_class.class_name,
user=request.user,
Expand Down
5 changes: 3 additions & 2 deletions netbox/extras/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from django.utils.translation import gettext as _
from django_rq import get_queue

from core.choices import ObjectChangeActionChoices
from core.events import *
from core.models import Job
from netbox.config import get_config
Expand Down Expand Up @@ -125,8 +126,8 @@ def process_event_rules(event_rules, object_type, event_type, data, username=Non
script = event_rule.action_object.python_class()

# Enqueue a Job to record the script's execution
Job.enqueue(
"extras.scripts.run_script",
ScriptJob = import_string("extras.jobs.ScriptJob")
ScriptJob.enqueue(
instance=event_rule.action_object,
name=script.name,
user=user,
Expand Down
105 changes: 105 additions & 0 deletions netbox/extras/jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import logging
import traceback
from contextlib import nullcontext

from django.db import transaction
from django.utils.translation import gettext as _

from extras.models import Script as ScriptModel
from extras.signals import clear_events
from netbox.context_managers import event_tracking
from utilities.exceptions import AbortScript, AbortTransaction
from utilities.jobs import BackgroundJob
from .utils import is_report


class ScriptJob(BackgroundJob):
"""
Script execution job.

A wrapper for calling Script.run(). This performs error handling and provides a hook for committing changes. It
exists outside the Script class to ensure it cannot be overridden by a script author.
"""

@staticmethod
def run_script(script, job, request, data, commit):
"""
Core script execution task. We capture this within a method to allow for conditionally wrapping it with the
event_tracking context manager (which is bypassed if commit == False).

Args:
job: The Job associated with this execution
request: The WSGI request associated with this execution (if any)
data: A dictionary of data to be passed to the script upon execution
commit: Passed through to Script.run()
"""
try:
try:
with transaction.atomic():
script.output = script.run(data, commit)
if not commit:
raise AbortTransaction()
except AbortTransaction:
script.log_info(message=_("Database changes have been reverted automatically."))
if script.failed:
logger.warning(f"Script failed")
raise

except Exception as e:
if type(e) is AbortScript:
msg = _("Script aborted with error: ") + str(e)
if is_report(type(script)):
script.log_failure(message=msg)
else:
script.log_failure(msg)
logger.error(f"Script aborted with error: {e}")

else:
stacktrace = traceback.format_exc()
script.log_failure(
message=_("An exception occurred: ") + f"`{type(e).__name__}: {e}`\n```\n{stacktrace}\n```"
)
logger.error(f"Exception raised during script execution: {e}")

if type(e) is not AbortTransaction:
script.log_info(message=_("Database changes have been reverted due to error."))

# Clear all pending events. Job termination (including setting the status) is handled by the job framework.
if request:
clear_events.send(request)
raise

# Update the job data regardless of the execution status of the job. Successes should be reported as well as
# failures.
finally:
job.data = script.get_job_data()

@classmethod
def run(cls, job, data, request=None, commit=True, **kwargs):
"""
Run the script.

Args:
job: The Job associated with this execution
data: A dictionary of data to be passed to the script upon execution
request: The WSGI request associated with this execution (if any)
commit: Passed through to Script.run()
"""
script = ScriptModel.objects.get(pk=job.object_id).python_class()

logger = logging.getLogger(f"netbox.scripts.{script.full_name}")
logger.info(f"Running script (commit={commit})")

# Add files to form data
if request:
files = request.FILES
for field_name, fileobj in files.items():
data[field_name] = fileobj

# Add the current request as a property of the script
script.request = request

# Execute the script. If commit is True, wrap it with the event_tracking context manager to ensure we process
# change logging, event rules, etc.
with event_tracking(request) if commit else nullcontext():
cls.run_script(script, job, request, data, commit)
Loading