diff --git a/lib/credo/check.ex b/lib/credo/check.ex index 64297f384..023b5f0ab 100644 --- a/lib/credo/check.ex +++ b/lib/credo/check.ex @@ -393,8 +393,11 @@ defmodule Credo.Check do defp do_run_on_all_source_files(exec, source_files, params) do source_files - |> Enum.map(&Task.async(fn -> run_on_source_file(exec, &1, params) end)) - |> Enum.each(&Task.await(&1, :infinity)) + |> Task.async_stream(fn source -> run_on_source_file(exec, source, params) end, + max_concurrency: exec.max_concurrent_check_runs, + timeout: :infinity + ) + |> Stream.run() :ok end diff --git a/lib/credo/check/runner.ex b/lib/credo/check/runner.ex index e6c3574c8..99f3bd794 100644 --- a/lib/credo/check/runner.ex +++ b/lib/credo/check/runner.ex @@ -19,9 +19,14 @@ defmodule Credo.Check.Runner do |> warn_about_ineffective_patterns(exec) |> fix_deprecated_notation_for_checks_without_params() - Credo.Check.Worker.run(check_tuples, exec.max_concurrent_check_runs, fn check_tuple -> - run_check(exec, check_tuple) - end) + check_tuples + |> Task.async_stream( + fn check_tuple -> + run_check(exec, check_tuple) + end, + timeout: :infinity + ) + |> Stream.run() :ok end diff --git a/lib/credo/check/worker.ex b/lib/credo/check/worker.ex deleted file mode 100644 index 1652e85e3..000000000 --- a/lib/credo/check/worker.ex +++ /dev/null @@ -1,126 +0,0 @@ -defmodule Credo.Check.Worker do - @moduledoc false - - @doc """ - Runs all members of `workloads` using ``. - """ - def run(workloads, max_concurrency, work_fn) do - {:ok, server_pid} = GenServer.start_link(__MODULE__.Server, workloads) - - worker_context = %{ - runner_pid: self(), - server_pid: server_pid, - max_concurrency: max_concurrency, - work_fn: work_fn, - results: [] - } - - outer_loop(worker_context, 0) - end - - @doc """ - Called when a workload has finished. - """ - def send_workload_finished_to_runner(worker_context, _workload, result) do - send(worker_context.runner_pid, {self(), {:workload_finished, result}}) - end - - defp outer_loop(worker_context, taken) do - available = worker_context.max_concurrency - taken - - cond do - available <= 0 -> - wait_for_workload_finished(worker_context, taken) - - taken_workloads = __MODULE__.Server.take_workloads(worker_context.server_pid, available) -> - inner_loop(worker_context, taken_workloads, taken) - - # we fall thru here if there are no checks left - # there are two options: we are done ... - taken == 0 -> - {:ok, worker_context.results} - - # ... or we need for the very last batch to finish up - true -> - wait_for_workload_finished(worker_context, taken) - end - end - - defp wait_for_workload_finished(worker_context, taken) do - receive do - {_spawned_pid, {:workload_finished, result}} -> - # IO.puts("Finished #{workload}") - new_worker_context = %{worker_context | results: [result | worker_context.results]} - - outer_loop(new_worker_context, taken - 1) - end - end - - defp inner_loop(worker_context, [], taken) do - outer_loop(worker_context, taken) - end - - defp inner_loop(worker_context, [workload | rest], taken) do - spawn_fn = fn -> - result = worker_context.work_fn.(workload) - - send_workload_finished_to_runner(worker_context, workload, result) - end - - spawn_link(spawn_fn) - - inner_loop(worker_context, rest, taken + 1) - end - - defmodule Server do - @moduledoc false - @timeout :infinity - - use GenServer - - def take_workloads(pid, count) do - GenServer.call(pid, {:take_workloads, count}, @timeout) - end - - # - # Server - # - - @impl true - def init(workloads) do - state = %{ - waiting: nil, - workloads: workloads - } - - {:ok, state} - end - - @impl true - def handle_call({:take_workloads, count}, from, %{waiting: nil} = state) do - {:noreply, take_workloads(%{state | waiting: {from, count}})} - end - - defp take_workloads(%{waiting: nil} = state) do - state - end - - defp take_workloads(%{waiting: {from, _count}, workloads: []} = state) do - GenServer.reply(from, nil) - - %{state | waiting: nil} - end - - defp take_workloads(%{workloads: []} = state) do - state - end - - defp take_workloads(%{waiting: {from, count}, workloads: workloads} = state) do - {reply, workloads} = Enum.split(workloads, count) - - GenServer.reply(from, reply) - - %{state | workloads: workloads, waiting: nil} - end - end -end diff --git a/lib/credo/execution.ex b/lib/credo/execution.ex index 506ddcf0f..005d62d42 100644 --- a/lib/credo/execution.ex +++ b/lib/credo/execution.ex @@ -42,13 +42,15 @@ defmodule Credo.Execution do enable_disabled_checks: nil, ignore_checks_tags: [], ignore_checks: nil, - max_concurrent_check_runs: nil, min_priority: 0, mute_exit_status: false, only_checks_tags: [], only_checks: nil, read_from_stdin: false, + # This is no longer used, but we keep it so existing plugins that use it don't break + max_concurrent_check_runs: nil, + # state, which is accessed and changed over the course of Credo's execution pipeline_map: %{}, commands: %{},