From a9f7a8c33cc8d245ec2a09ceec1d220f443c6de8 Mon Sep 17 00:00:00 2001 From: Parker Selbert Date: Mon, 2 Sep 2024 10:46:52 -0500 Subject: [PATCH] Use concat operator for error concatenation The standard `push` operation for updates is designed for arrays and uses `array_append` internally. This replaces all use of `push` with a fragment that uses the `||` operator instead, which works for both arrays and jsonb. CRDB (Cockroach) doesn't support arrays of jsonb, but they do support simple jsonb columns. Now we can append to the errors column in either format for CRDB compatibility. --- lib/oban/engines/basic.ex | 47 +++++++++++++++++++++++++++++---------- 1 file changed, 35 insertions(+), 12 deletions(-) diff --git a/lib/oban/engines/basic.ex b/lib/oban/engines/basic.ex index 638e09dc..1ff70b18 100644 --- a/lib/oban/engines/basic.ex +++ b/lib/oban/engines/basic.ex @@ -17,6 +17,14 @@ defmodule Oban.Engines.Basic do alias Ecto.Changeset alias Oban.{Config, Engine, Job, Repo} + # This is a replacement for `push`, which uses `array_append` and isn't compatible with jsonb + # arrays. The `||` operator works with both arrays and jsonb. + defmacrop concat_errors(column, error) do + quote do + fragment("? || ?", unquote(column), unquote(error)) + end + end + @impl Engine def init(%Config{} = conf, opts) do if Keyword.has_key?(opts, :limit) do @@ -194,24 +202,36 @@ defmodule Oban.Engines.Basic do @impl Engine def discard_job(%Config{} = conf, %Job{} = job) do - updates = [ - set: [state: "discarded", discarded_at: utc_now()], - push: [errors: Job.format_attempt(job)] - ] + query = + Job + |> where(id: ^job.id) + |> update([j], + set: [ + state: "discarded", + discarded_at: ^utc_now(), + errors: concat_errors(j.errors, ^[Job.format_attempt(job)]) + ] + ) - Repo.update_all(conf, where(Job, id: ^job.id), updates) + Repo.update_all(conf, query, []) :ok end @impl Engine def error_job(%Config{} = conf, %Job{} = job, seconds) when is_integer(seconds) do - updates = [ - set: [state: "retryable", scheduled_at: seconds_from_now(seconds)], - push: [errors: Job.format_attempt(job)] - ] + query = + Job + |> where(id: ^job.id) + |> update([j], + set: [ + state: "retryable", + scheduled_at: ^seconds_from_now(seconds), + errors: concat_errors(j.errors, ^[Job.format_attempt(job)]) + ] + ) - Repo.update_all(conf, where(Job, id: ^job.id), updates) + Repo.update_all(conf, query, []) :ok end @@ -235,8 +255,11 @@ defmodule Oban.Engines.Basic do query = if is_map(job.unsaved_error) do update(query, [j], - set: [state: "cancelled", cancelled_at: ^utc_now()], - push: [errors: ^Job.format_attempt(job)] + set: [ + state: "cancelled", + cancelled_at: ^utc_now(), + errors: concat_errors(j.errors, ^[Job.format_attempt(job)]) + ] ) else query