Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: New fetch job function. #1241

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions procrastinate/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,16 @@ async def fetch_job(self, queues: Iterable[str] | None) -> jobs.Job | None:
:
None if no suitable job was found. The job otherwise.
"""

row = await self.connector.execute_query_one_async(
query=sql.queries["fetch_job"], queues=queues
)
try:
row = await self.connector.execute_query_one_async(
query=sql.queries["fetch_job"], queues=queues
)
except exceptions.UniqueViolation as _exc:
# getting a job with lock lead to conflicts
# get any doable job instead
row = await self.connector.execute_query_one_async(
query=sql.queries["fetch_job_without_lock"], queues=queues
)

# fetch_tasks will always return a row, but is there's no relevant
# value, it will all be None
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
-- new fetch job. Only checks for doing. On update conflict return NULL
CREATE OR REPLACE FUNCTION procrastinate.procrastinate_fetch_job(target_queue_names character varying[])
RETURNS procrastinate.procrastinate_jobs
LANGUAGE plpgsql
AS $function$
DECLARE
found_jobs procrastinate_jobs;
retry_count INT := 0;
BEGIN
LOOP
BEGIN
WITH candidate AS (
SELECT jobs.*
FROM procrastinate_jobs AS jobs
WHERE
(jobs.lock IS NULL OR
NOT EXISTS ( -- reject the job if its lock has current jobs
SELECT 1
FROM procrastinate_jobs AS jobs_with_locks
WHERE
jobs.lock IS NOT NULL
AND jobs_with_locks.lock = jobs.lock
AND jobs_with_locks.status = 'doing'
LIMIT 1
))
AND jobs.status = 'todo'
AND (target_queue_names IS NULL OR jobs.queue_name = ANY(target_queue_names))
AND (jobs.scheduled_at IS NULL OR jobs.scheduled_at <= now())
ORDER BY jobs.priority DESC, jobs.id ASC LIMIT 1
FOR UPDATE OF jobs SKIP LOCKED
)
UPDATE procrastinate_jobs
SET status = 'doing'
FROM candidate
WHERE procrastinate_jobs.id = candidate.id
RETURNING procrastinate_jobs.* INTO found_jobs;

RETURN found_jobs;
EXCEPTION
WHEN unique_violation THEN
PERFORM pg_sleep(random() * 0.01); -- fuzzy retry with upto 10ms sleep
retry_count := retry_count + 1;
IF retry_count >= 3 THEN
RAISE; --reraise
END IF;
END;
END LOOP;
END;
$function$
;

CREATE OR REPLACE FUNCTION procrastinate.procrastinate_fetch_job_without_lock(target_queue_names character varying[])
RETURNS procrastinate.procrastinate_jobs
LANGUAGE plpgsql
AS $$
DECLARE
found_jobs procrastinate_jobs;
BEGIN
WITH candidate AS (
SELECT jobs.*
FROM procrastinate_jobs AS jobs
WHERE
jobs.lock IS NULL
AND jobs.status = 'todo'
AND (target_queue_names IS NULL OR jobs.queue_name = ANY(target_queue_names))
AND (jobs.scheduled_at IS NULL OR jobs.scheduled_at <= now())
ORDER BY jobs.priority DESC, jobs.id ASC LIMIT 1
FOR UPDATE OF jobs SKIP LOCKED
)
UPDATE procrastinate_jobs
SET status = 'doing'
FROM candidate
WHERE procrastinate_jobs.id = candidate.id
RETURNING procrastinate_jobs.* INTO found_jobs;
RETURN found_jobs;
END;
$$;
5 changes: 5 additions & 0 deletions procrastinate/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ SELECT procrastinate_defer_periodic_job(%(queue)s, %(lock)s, %(queueing_lock)s,
SELECT id, status, task_name, priority, lock, queueing_lock, args, scheduled_at, queue_name, attempts
FROM procrastinate_fetch_job(%(queues)s);

-- fetch_job_without_lock --
-- Get the first awaiting job
SELECT id, status, task_name, priority, lock, queueing_lock, args, scheduled_at, queue_name, attempts
FROM procrastinate_fetch_job(%(queues)s);

-- select_stalled_jobs --
-- Get running jobs that started more than a given time ago
SELECT job.id, status, task_name, priority, lock, queueing_lock, args, scheduled_at, queue_name, attempts, max(event.at) started_at
Expand Down
95 changes: 67 additions & 28 deletions procrastinate/sql/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -157,44 +157,83 @@ BEGIN
END;
$$;

CREATE FUNCTION procrastinate_fetch_job(
target_queue_names character varying[]
)
RETURNS procrastinate_jobs
LANGUAGE plpgsql
CREATE OR REPLACE FUNCTION procrastinate.procrastinate_fetch_job(target_queue_names character varying[])
RETURNS procrastinate.procrastinate_jobs
LANGUAGE plpgsql
AS $$
DECLARE
found_jobs procrastinate_jobs;
found_jobs procrastinate_jobs;
retry_count INT := 0;
BEGIN
WITH candidate AS (
SELECT jobs.*
FROM procrastinate_jobs AS jobs
WHERE
-- reject the job if its lock has earlier jobs
NOT EXISTS (
SELECT 1
FROM procrastinate_jobs AS earlier_jobs
LOOP
BEGIN
WITH candidate AS (
SELECT jobs.*
FROM procrastinate_jobs AS jobs
WHERE
(jobs.lock IS NULL OR
NOT EXISTS ( -- reject the job if its lock has current jobs
SELECT 1
FROM procrastinate_jobs AS jobs_with_locks
WHERE
jobs.lock IS NOT NULL
AND earlier_jobs.lock = jobs.lock
AND earlier_jobs.status IN ('todo', 'doing', 'aborting')
AND earlier_jobs.id < jobs.id)
AND jobs.status = 'todo'
AND (target_queue_names IS NULL OR jobs.queue_name = ANY( target_queue_names ))
AND (jobs.scheduled_at IS NULL OR jobs.scheduled_at <= now())
ORDER BY jobs.priority DESC, jobs.id ASC LIMIT 1
FOR UPDATE OF jobs SKIP LOCKED
AND jobs_with_locks.lock = jobs.lock
AND jobs_with_locks.status = 'doing'
LIMIT 1
))
AND jobs.status = 'todo'
AND (target_queue_names IS NULL OR jobs.queue_name = ANY(target_queue_names))
AND (jobs.scheduled_at IS NULL OR jobs.scheduled_at <= now())
ORDER BY jobs.priority DESC, jobs.id ASC LIMIT 1
FOR UPDATE OF jobs SKIP LOCKED
)
UPDATE procrastinate_jobs
SET status = 'doing'
FROM candidate
WHERE procrastinate_jobs.id = candidate.id
RETURNING procrastinate_jobs.* INTO found_jobs;

RETURN found_jobs;
EXCEPTION
WHEN unique_violation THEN
PERFORM pg_sleep(random() * 0.01); -- fuzzy retry with upto 10ms sleep
retry_count := retry_count + 1;
IF retry_count >= 3 THEN
RAISE; --reraise
END IF;
END;
END LOOP;
END;
$$;

CREATE OR REPLACE FUNCTION procrastinate.procrastinate_fetch_job_without_lock(target_queue_names character varying[])
RETURNS procrastinate.procrastinate_jobs
LANGUAGE plpgsql
AS $$
DECLARE
found_jobs procrastinate_jobs;
BEGIN
WITH candidate AS (
SELECT jobs.*
FROM procrastinate_jobs AS jobs
WHERE
jobs.lock IS NULL
AND jobs.status = 'todo'
AND (target_queue_names IS NULL OR jobs.queue_name = ANY(target_queue_names))
AND (jobs.scheduled_at IS NULL OR jobs.scheduled_at <= now())
ORDER BY jobs.priority DESC, jobs.id ASC LIMIT 1
FOR UPDATE OF jobs SKIP LOCKED
)
UPDATE procrastinate_jobs
SET status = 'doing'
FROM candidate
WHERE procrastinate_jobs.id = candidate.id
RETURNING procrastinate_jobs.* INTO found_jobs;

RETURN found_jobs;
SET status = 'doing'
FROM candidate
WHERE procrastinate_jobs.id = candidate.id
RETURNING procrastinate_jobs.* INTO found_jobs;
RETURN found_jobs;
END;
$$;


-- procrastinate_finish_job
-- the next_scheduled_at argument is kept for compatibility reasons, it is to be
-- removed after 1.0.0 is released
Expand Down
Loading