-
Notifications
You must be signed in to change notification settings - Fork 36
/
worker.ex
47 lines (41 loc) · 1.73 KB
/
worker.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
defmodule EctoJob.Worker do
@moduledoc """
Worker module responsible for executing a single Job
"""
alias EctoJob.{Config, JobQueue}
require Logger
@type repo :: module
@doc """
Equivalent to `start_link(config, job, DateTime.utc_now())`
"""
@spec start_link(Config.t, EctoJob.JobQueue.job()) :: {:ok, pid}
def start_link(config, job), do: start_link(config, job, DateTime.utc_now())
@doc """
Start a worker process given a repo module and a job struct
This may fail if the job reservation has expired, in which case the job will be
reactivated by the producer.
"""
@spec start_link(Config.t, EctoJob.JobQueue.job(), DateTime.t()) :: {:ok, pid}
def start_link(config = %Config{repo: repo, execution_timeout: timeout}, job = %queue{}, now) do
Task.start_link(fn ->
with {:ok, job} <- JobQueue.update_job_in_progress(repo, job, now, timeout) do
queue.perform(JobQueue.initial_multi(job), job.params)
log_duration(config, job, now)
notify_completed(repo, job)
end
end)
end
@spec log_duration(Config.t, EctoJob.JobQueue.job(), DateTime.t()) :: :ok
defp log_duration(%Config{log: true, log_level: log_level}, _job = %queue{id: id}, start = %DateTime{}) do
duration = DateTime.diff(DateTime.utc_now(), start, :microseconds)
Logger.log(log_level, fn -> "#{queue}[#{id}] done: #{duration} µs" end)
end
defp log_duration(_config, _job, _start), do: :ok
@spec notify_completed(repo, EctoJob.JobQueue.job()) :: :ok
defp notify_completed(_repo, _job = %{notify: nil}), do: :ok
defp notify_completed(repo, _job = %queue{notify: payload}) do
topic = queue.__schema__(:source) <> ".completed"
repo.query("SELECT pg_notify($1, $2)", [topic, payload])
:ok
end
end