From 12840ae2357501fcaf38ce8ec67fc1d9ac1321d9 Mon Sep 17 00:00:00 2001 From: Zach Daniel Date: Tue, 17 Oct 2023 12:24:28 -0400 Subject: [PATCH] fix: honor `max_concurrency` option --- lib/ash/actions/create/bulk.ex | 44 +++++++++++++++++++++------------- 1 file changed, 28 insertions(+), 16 deletions(-) diff --git a/lib/ash/actions/create/bulk.ex b/lib/ash/actions/create/bulk.ex index 27c27a60d..ce24e0987 100644 --- a/lib/ash/actions/create/bulk.ex +++ b/lib/ash/actions/create/bulk.ex @@ -501,26 +501,38 @@ defmodule Ash.Actions.Create.Bulk do max_concurrency = opts[:max_concurrency] if max_concurrency && max_concurrency > 1 do - Task.async_stream(stream, fn - {:error, error} -> - # This is subpar, we shouldn't star tasks for errors - {:error, error} + Task.async_stream( + stream, + fn + {:error, error} -> + # This is subpar, we shouldn't star tasks for errors + {:error, error} - {:batch, batch} -> - Process.put(:ash_started_transaction?, true) - batch_result = callback.(batch) - new_notifications = Process.get(:ash_notifications, []) + {:batch, batch} -> + Process.put(:ash_started_transaction?, true) + batch_result = callback.(batch) + new_notifications = Process.get(:ash_notifications, []) - case batch_result do - {:ok, invalid, notifications} -> - {:ok, invalid, notifications ++ new_notifications} + case batch_result do + {:ok, invalid, notifications} -> + {:ok, invalid, notifications ++ new_notifications} - {:ok, results, invalid, notifications} -> - {:ok, results, invalid, notifications ++ new_notifications} + {:ok, results, invalid, notifications} -> + {:ok, results, invalid, notifications ++ new_notifications} - other -> - other - end + other -> + other + end + end, + timeout: :infinity, + max_concurrency: max_concurrency + ) + |> Stream.map(fn + {:ok, value} -> + value + + {:exit, error} -> + {:error, error} end) else Stream.map(stream, fn