Skip to content


Add unordered version of parallel map
Browse files Browse the repository at this point in the history
  • Loading branch information
seriyps committed Oct 2, 2024
1 parent b8a81f8 commit b51b5e6
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 41 deletions.
119 changes: 89 additions & 30 deletions src/iterator_pmap.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@
%% It starts workers immediately, not on-demand. It kills workers when there is no more work or
%% as `close' cleanup.
%% While input is processed in parallel, the original order is preserved in the output.
%% The `ordered' option controls if the order of elements in the resulting iterator should match
%% the order of input iterator.
%% - If `ordered' is `true', the order is guaranteed, but pool utilization might be not optimal if
%% the time to process single element is not uniform (head-of-line blocking).
%% - If `ordered' is `false', the order is not guaranteed, but pool utilization is optimal. Keep in
%% mind however that single worker can stay busy far beyond the `recv_timeout'.
%% It prioritizes that all the workers are busy over returning the result immediately (so
%% it does not return a result untill all workers are busy or inner iterator is depleted).
Expand All @@ -16,26 +21,30 @@

-type tag() :: any().

-record(state, {
state :: normal | final,
recv_timeout :: timeout(),
tag :: tag(),
free :: [pid()],
busy :: queue:queue(pid()),
busy :: {ordered, queue:queue(pid())} | {unordered, sets:set(pid())},
inner_i :: iterator:iterator(any()) | undefined

%% @doc If your pmap has crashed and you had to catch the error, you can use this function to
%% flush the results from the workers. But it is recommended to not catch the error and crash.
%% If error is catched, there is a risk that workers are not killed.
-spec flush() -> [pid()].
flush() ->
-spec flush(tag()) -> [pid()].
flush(Tag) ->
{?MODULE, Pid, _} ->
[Pid | flush()]
{Tag, Pid, _} ->
[Pid | flush(Tag)]
after 0 ->
Expand All @@ -52,26 +61,38 @@ pmap(F, I) ->
%% read-ahead from the input iterator</li>
%% <li>`recv_timeout' (default: infinity) - timeout for receiving result from worker, if
%% reached, the pool will be shut down and `timeout' error is generated</li>
%% <li>`ordered' (default: true) - if `true', the order of elements in the resulting iterator
%% will match the order of input iterator; otherwise order is not guaranteed, but worker
%% pool utilization will be better if task size is not uniform</li>
%% <li>`tag' (default: `erlang:make_ref()') - unique tag to identify the
%% pool (see {@link flush/1}). Make sure it is small, because it is sent between processes
%% a lot</li>
%% </ul>
-spec pmap(
fun((InType) -> OutType),
concurrency => pos_integer(),
recv_timeout => timeout()
recv_timeout => timeout(),
ordered => boolean(),
tag => tag()
) -> iterator:iterator(OutType) when
InType :: any(),
OutType :: any().
pmap(F, I, Opts) ->
Concurrency = maps:get(concurrency, Opts, 10),
RecvTimeout = maps:get(recv_timeout, Opts, infinity),
Workers = launch(F, Concurrency),
Ordered = maps:get(ordered, Opts, true),
Tag = maps:get(tag, Opts, erlang:make_ref()),
Workers = launch(F, Tag, Concurrency),
Busy = busy_init(Ordered),
St = #state{
state = normal,
recv_timeout = RecvTimeout,
tag = Tag,
free = Workers,
busy = queue:new(),
busy = Busy,
inner_i = I
iterator:new(fun yield_next/1, St, fun shutdown/1).
Expand All @@ -80,9 +101,11 @@ yield_next(#state{state = normal, free = [_ | _]} = Pool) ->
%% The inner iterator is not yet exhausted and there are free workers:
%% push the next value to the worker as long as there are workers and values.
yield_next(#state{state = normal, free = [], busy = Busy, recv_timeout = Timeout} = Pool) ->
#state{state = normal, tag = Tag, free = [], busy = Busy, recv_timeout = Timeout} = Pool
) ->
%% All workers are busy, wait for the next result.
case recv(Busy, Timeout) of
case recv(Busy, Tag, Timeout) of
%% We can't get `empty' here, because we have at least one busy worker.
{Result, Pid, Busy1} ->
Pool1 = push(Pool#state{
Expand All @@ -94,9 +117,9 @@ yield_next(#state{state = normal, free = [], busy = Busy, recv_timeout = Timeout
yield_next(#state{state = final, busy = Busy, recv_timeout = Timeout} = Pool) ->
yield_next(#state{state = final, tag = Tag, busy = Busy, recv_timeout = Timeout} = Pool) ->
%% The inner iterator is exhausted, wait for the remaining results.
case recv(Busy, Timeout) of
case recv(Busy, Tag, Timeout) of
{Result, Pid, Busy1} ->
true = unlink(Pid),
exit(Pid, shutdown),
Expand All @@ -112,46 +135,82 @@ yield_next(#state{state = final, busy = Busy, recv_timeout = Timeout} = Pool) ->
%% Pool API

push(#state{state = normal, free = [Pid | Free], busy = Busy, inner_i = I} = Pool) ->
busy_init(true) ->
{ordered, queue:new()};
busy_init(false) ->
{unordered, sets_new()}.

busy_push(Pid, {ordered, Queue}) ->
{ordered, queue:in(Pid, Queue)};
busy_push(Pid, {unordered, Set}) ->
{unordered, sets:add_element(Pid, Set)}.

busy_to_list({ordered, Queue}) ->
busy_to_list({unordered, Set}) ->

-if(?OTP_RELEASE >= 24).
sets_new() -> sets:new([{version, 2}]).
sets_new() -> sets:new().

push(#state{state = normal, tag = Tag, free = [Pid | Free], busy = Busy, inner_i = I} = Pool) ->
case iterator:next(I) of
{ok, Val, I1} ->
Pid ! {?MODULE, self(), {next, Val}},
Pid ! {Tag, self(), {next, Val}},
free = Free,
busy = queue:in(Pid, Busy),
busy = busy_push(Pid, Busy),
inner_i = I1
done ->
%% TODO: shutdown `free` here
Pool#state{state = final, inner_i = undefined}
push(#state{state = normal, free = []} = Pool) ->

recv(Busy, Timeout) ->
recv({ordered, Busy}, Tag, Timeout) ->
%% The order is guaranteed by using selective receive that matches by worker pid.
case queue:out(Busy) of
{{value, Pid}, Busy1} ->
{?MODULE, Pid, {result, Result}} ->
{Result, Pid, Busy1}
{Tag, Pid, {result, Result}} ->
{Result, Pid, {ordered, Busy1}}
after Timeout ->
{empty, _} ->
recv({unordered, Busy}, Tag, Timeout) ->
%% Order is not guaranteed because we receive first ready result
case sets:is_empty(Busy) of
true ->
false ->
{Tag, Pid, {result, Result}} ->
{Result, Pid, {unordered, sets:del_element(Pid, Busy)}}
after Timeout ->

%% Pool management

launch(F, Concurrency) ->
launch(F, Tag, Concurrency) ->
spawn_link(?MODULE, loop, [self(), F])
spawn_link(?MODULE, loop, [self(), F, Tag])
|| _ <- lists:seq(1, Concurrency)

shutdown(#state{free = Free, busy = Busy}) ->
Pids = Free ++ queue:to_list(Busy),
shutdown(#state{free = Free, tag = Tag, busy = Busy}) ->
Pids = Free ++ busy_to_list(Busy),
fun(Pid) ->
Expand All @@ -162,7 +221,7 @@ shutdown(#state{free = Free, busy = Busy}) ->
%% Flush potential `result' messages from workers
{?MODULE, Pid, {result, _}} -> ok
{Tag, Pid, {result, _}} -> ok
after 0 ->
Expand All @@ -171,9 +230,9 @@ shutdown(#state{free = Free, busy = Busy}) ->

%% @private
loop(Parent, F) ->
loop(Parent, F, Tag) ->
{?MODULE, Parent, {next, I}} ->
Parent ! {?MODULE, self(), {result, F(I)}},
?MODULE:loop(Parent, F)
{Tag, Parent, {next, I}} ->
Parent ! {Tag, self(), {result, F(I)}},
?MODULE:loop(Parent, F, Tag)

0 comments on commit b51b5e6

Please sign in to comment.