Skip to content

Commit

Permalink
fix: honor max_concurrency option
Browse files Browse the repository at this point in the history
  • Loading branch information
zachdaniel committed Oct 17, 2023
1 parent 7301e09 commit 12840ae
Showing 1 changed file with 28 additions and 16 deletions.
44 changes: 28 additions & 16 deletions lib/ash/actions/create/bulk.ex
Original file line number Diff line number Diff line change
Expand Up @@ -501,26 +501,38 @@ defmodule Ash.Actions.Create.Bulk do
max_concurrency = opts[:max_concurrency]

if max_concurrency && max_concurrency > 1 do
Task.async_stream(stream, fn
{:error, error} ->
# This is subpar, we shouldn't star tasks for errors
{:error, error}
Task.async_stream(
stream,
fn
{:error, error} ->
# This is subpar, we shouldn't star tasks for errors
{:error, error}

{:batch, batch} ->
Process.put(:ash_started_transaction?, true)
batch_result = callback.(batch)
new_notifications = Process.get(:ash_notifications, [])
{:batch, batch} ->
Process.put(:ash_started_transaction?, true)
batch_result = callback.(batch)
new_notifications = Process.get(:ash_notifications, [])

case batch_result do
{:ok, invalid, notifications} ->
{:ok, invalid, notifications ++ new_notifications}
case batch_result do
{:ok, invalid, notifications} ->
{:ok, invalid, notifications ++ new_notifications}

{:ok, results, invalid, notifications} ->
{:ok, results, invalid, notifications ++ new_notifications}
{:ok, results, invalid, notifications} ->
{:ok, results, invalid, notifications ++ new_notifications}

other ->
other
end
other ->
other
end
end,
timeout: :infinity,
max_concurrency: max_concurrency
)
|> Stream.map(fn
{:ok, value} ->
value

{:exit, error} ->
{:error, error}
end)
else
Stream.map(stream, fn
Expand Down

0 comments on commit 12840ae

Please sign in to comment.