diff --git a/lib/oban/queue/executor.ex b/lib/oban/queue/executor.ex index f54cb446..ac569b9a 100644 --- a/lib/oban/queue/executor.ex +++ b/lib/oban/queue/executor.ex @@ -36,6 +36,7 @@ defmodule Oban.Queue.Executor do :error, :job, :meta, + :pid, :result, :snooze, :start_mono, @@ -58,6 +59,7 @@ defmodule Oban.Queue.Executor do conf: conf, job: %{job | conf: conf}, meta: event_metadata(conf, job), + pid: self(), safe: Keyword.get(opts, :safe, true), start_mono: System.monotonic_time(), start_time: System.system_time() @@ -260,8 +262,6 @@ defmodule Oban.Queue.Executor do @spec emit_event(t()) :: t() def emit_event(%__MODULE__{state: state} = exec) when state in [:failure, :exhausted] do - measurements = %{duration: exec.duration, queue_time: exec.queue_time} - kind = case exec.kind do {:EXIT, _pid} -> :exit @@ -281,15 +281,13 @@ defmodule Oban.Queue.Executor do state: state }) - :telemetry.execute([:oban, :job, :exception], measurements, meta) + :telemetry.execute([:oban, :job, :exception], measurements(exec), meta) exec end def emit_event(%__MODULE__{state: state} = exec) when state in [:cancelled, :success, :snoozed, :discard] do - measurements = %{duration: exec.duration, queue_time: exec.queue_time} - meta = Map.merge(exec.meta, %{ job: exec.job, @@ -297,7 +295,7 @@ defmodule Oban.Queue.Executor do result: exec.result }) - :telemetry.execute([:oban, :job, :stop], measurements, meta) + :telemetry.execute([:oban, :job, :stop], measurements(exec), meta) exec end @@ -321,6 +319,22 @@ defmodule Oban.Queue.Executor do |> Map.merge(%{conf: conf, job: job, prefix: conf.prefix}) end + defp measurements(exec) do + %{ + duration: exec.duration, + memory: info_for(exec, :memory), + queue_time: exec.queue_time, + reductions: info_for(exec, :reductions) + } + end + + defp info_for(%__MODULE__{pid: pid}, item) do + case Process.info(pid, item) do + {^item, value} -> value + nil -> 0 + end + end + defp log_warning(%__MODULE__{safe: true, worker: worker}, returned) do Logger.warning(fn -> """ diff --git a/lib/oban/telemetry.ex b/lib/oban/telemetry.ex index 2b26922a..a3937900 100644 --- a/lib/oban/telemetry.ex +++ b/lib/oban/telemetry.ex @@ -29,18 +29,18 @@ defmodule Oban.Telemetry do provide the error type, the error itself, and the stacktrace. The following chart shows which metadata you can expect for each event: - | event | measures | metadata | - | ------------ | -------------------------- | ----------------------------------------------------------------------- | - | `:start` | `:system_time` | `:conf`, `:job` | - | `:stop` | `:duration`, `:queue_time` | `:conf`, `:job`, `:state`, `:result` | - | `:exception` | `:duration`, `:queue_time` | `:conf`, `:job`, `:state`, `:kind`, `:reason`, `:result`, `:stacktrace` | + | event | measures | metadata | + | ------------ | ---------------------------------------------------- | ----------------------------------------------------------------------- | + | `:start` | `:system_time` | `:conf`, `:job` | + | `:stop` | `:duration`, `:memory`, `:queue_time`, `:reductions` | `:conf`, `:job`, `:state`, `:result` | + | `:exception` | `:duration`, `:memory`, `:queue_time`, `:reductions` | `:conf`, `:job`, `:state`, `:kind`, `:reason`, `:result`, `:stacktrace` | #### Metadata * `:conf` — the executing Oban instance's config * `:job` — the executing `Oban.Job` - * `:state` — one of `:success`, `:failure`, `:cancelled`, `:discard` or `:snoozed` * `:result` — the `perform/1` return value, always `nil` for an exception or crash + * `:state` — one of `:success`, `:failure`, `:cancelled`, `:discard` or `:snoozed` For `:exception` events the metadata also includes details about what caused the failure. The `:kind` value is determined by how an error occurred. Here are the possible kinds: diff --git a/test/oban/telemetry_test.exs b/test/oban/telemetry_test.exs index 42c9f945..ea9edbb3 100644 --- a/test/oban/telemetry_test.exs +++ b/test/oban/telemetry_test.exs @@ -47,11 +47,17 @@ defmodule Oban.TelemetryTest do %Job{id: error_id} = insert!([ref: 2, action: "ERROR"], tags: ["foo"]) assert_receive {:event, :start, started_time, start_meta} - assert_receive {:event, :stop, %{duration: stop_duration, queue_time: queue_time}, stop_meta} - assert_receive {:event, :exception, error_duration, %{kind: :error} = error_meta} + assert_receive {:event, :stop, stop_meas, stop_meta} + assert_receive {:event, :exception, error_meas, %{kind: :error} = error_meta} + + assert %{duration: stop_duration, queue_time: queue_time} = stop_meas + assert %{memory: stop_memory, reductions: stop_reductions} = stop_meas + assert %{duration: error_duration, queue_time: _} = error_meas assert started_time > 0 assert stop_duration > 0 + assert stop_memory > 0 + assert stop_reductions > 0 assert queue_time > 0 assert error_duration > 0 diff --git a/test/support/telemetry_handler.ex b/test/support/telemetry_handler.ex index 5cf41eb2..770cd32e 100644 --- a/test/support/telemetry_handler.ex +++ b/test/support/telemetry_handler.ex @@ -38,12 +38,8 @@ defmodule Oban.TelemetryHandler do send(pid, {:event, :start, start_time, meta}) end - def handle([:oban, :job, :stop], measure, meta, pid) do - send(pid, {:event, :stop, measure, meta}) - end - - def handle([:oban, :job, event], %{duration: duration}, meta, pid) do - send(pid, {:event, event, duration, meta}) + def handle([:oban, :job, event], measure, meta, pid) do + send(pid, {:event, event, measure, meta}) end def handle([:oban, :engine | event], measure, meta, pid) do