-
Notifications
You must be signed in to change notification settings - Fork 62
/
queue_stats.ex
135 lines (108 loc) Β· 3.33 KB
/
queue_stats.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
defmodule Verk.QueueStats do
@moduledoc """
This process will update an :ets table with the following information per queue:
* Current amount of running jobs
* Amount of finished jobs
* Amount of failed jobs
It will persist to redis from time to time.
It also holds information about the current status of queus. They can be:
* running
* idle
* pausing
* paused
"""
use GenStage
require Logger
alias Verk.QueueStatsCounters
defmodule State do
@moduledoc false
defstruct queues: %{}
end
@persist_interval 10_000
@doc false
def start_link() do
GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
end
@doc """
Lists the queues and their stats searching for a `prefix` if provided.
"""
@spec all(binary) :: Map.t()
def all(prefix \\ "") do
GenServer.call(__MODULE__, {:all, prefix})
end
defp status(queue, queues, running_counter) do
status = queues[queue] || Verk.Manager.status(queue)
if status == :running and running_counter == 0 do
:idle
else
status
end
end
@doc false
def init(_) do
QueueStatsCounters.init()
Process.send_after(self(), :persist_stats, @persist_interval)
{:consumer, %State{}, subscribe_to: [Verk.EventProducer]}
end
def handle_call({:all, prefix}, _from, state) do
result =
for {queue, running, finished, failed} <- QueueStatsCounters.all(prefix), is_list(queue) do
queue = to_string(queue)
%{
queue: queue,
status: status(queue, state.queues, running),
running_counter: running,
finished_counter: finished,
failed_counter: failed
}
end
queues =
for %{queue: queue, status: status} <- result, into: state.queues, do: {queue, status}
{:reply, result, [], %State{queues: queues}}
end
def handle_events(events, _from, state) do
new_state =
Enum.reduce(events, state, fn event, state ->
handle_event(event, state)
end)
{:noreply, [], new_state}
end
@doc false
defp handle_event(%Verk.Events.JobStarted{job: job}, state) do
QueueStatsCounters.register(:started, job.queue)
state
end
defp handle_event(%Verk.Events.JobFinished{job: job}, state) do
QueueStatsCounters.register(:finished, job.queue)
state
end
defp handle_event(%Verk.Events.JobFailed{job: job}, state) do
QueueStatsCounters.register(:failed, job.queue)
state
end
defp handle_event(%Verk.Events.QueueRunning{queue: queue}, state) do
QueueStatsCounters.reset_started(queue)
%{state | queues: Map.put(state.queues, to_string(queue), :running)}
end
defp handle_event(%Verk.Events.QueuePausing{queue: queue}, state) do
%{state | queues: Map.put(state.queues, to_string(queue), :pausing)}
end
defp handle_event(%Verk.Events.QueuePaused{queue: queue}, state) do
%{state | queues: Map.put(state.queues, to_string(queue), :paused)}
end
@doc false
def handle_info(:persist_stats, state) do
case QueueStatsCounters.persist() do
:ok ->
:ok
{:error, reason} ->
Logger.error("QueueStats failed to persist stats to Redis. Reason: #{inspect(reason)}")
end
Process.send_after(self(), :persist_stats, @persist_interval)
{:noreply, [], state}
end
@doc false
def handle_info(_, state) do
{:noreply, [], state}
end
end