Skip to content

Commit

Permalink
Add parallel map iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
seriyps committed Sep 16, 2024
1 parent 2b4e62a commit 0201b9b
Show file tree
Hide file tree
Showing 3 changed files with 307 additions and 0 deletions.
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ With this code, using `iterator`, we managed to go through the whole file never
a single line in-memory but were able to work with it using the same code style and high-order
functions as what we would use if we read all the file lines in memory.

Full list of helper functions see in the `iterator.erl`. But the naming is the same as in the
OTP `lists` module.

Functions `iterator_pmap:pmap/2` and `iterator_pmap:pmap/3` provide parallel version
of `iterator:map/2`: it takes iterator as input and returns a new iterator where map function
is executed for each input element in parallel on a pool of worker processes.
While elements of input are processed in parallel, the ordering of elements is preserved.

## Setup

Add it to your `rebar.config`
Expand Down
163 changes: 163 additions & 0 deletions src/iterator_pmap.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
%% @doc Parallel map implementation over iterator.
%%
%% It starts workers immediately, not on-demand. It kills workers when there is no more work or
%% as `close' cleanup.
%%
%% While input is processed in parallel, the original order is preserved in the output.
%%
%% It prioritizes that all the workers are busy over returning the result immediately (so
%% it does not return a result untill all workers are busy or inner iterator is depleted).
%%
%% When recv_timeout is not infinity, a `timeout' exception may be raised and no worker
%% cleanup is done. It is recommended to not catch this error and crash the calling process.
%%
%% Worker processes are linked to the caller process.
-module(iterator_pmap).

-export([
pmap/2,
pmap/3,
flush/0
]).
-export([loop/2]).

-record(state, {
state :: normal | final,
recv_timeout :: timeout(),
free :: [pid()],
busy :: queue:queue(pid()),
inner_i :: iterator:iterator(any()) | undefined
}).

%% @doc If your pmap has crashed and you had to catch the error, you can use this function to
%% flush the results from the workers. But it is recommended to not catch the error and crash.
%% If error is catched, there is a risk that workers are not killed.
-spec flush() -> [pid()].
flush() ->
receive
{?MODULE, Pid, _} ->
[Pid | flush()]
after 0 ->
[]
end.

pmap(F, I) ->
pmap(F, I, #{}).

-spec pmap(
fun((InType) -> OutType),
iterator:iterator(InType),
#{
concurrency => pos_integer(),
recv_timeout => timeout()
}
) -> iterator:iterator(OutType) when
InType :: any(),
OutType :: any().
pmap(F, I, Opts) ->
Concurrency = maps:get(concurrency, Opts, 10),
RecvTimeout = maps:get(recv_timeout, Opts, infinity),
Workers = launch(F, Concurrency),
St = #state{
state = normal,
recv_timeout = RecvTimeout,
free = Workers,
busy = queue:new(),
inner_i = I
},
iterator:new(fun yield_next/1, St, fun shutdown/1).

yield_next(#state{state = normal, free = [_ | _]} = Pool) ->
%% The inner iterator is not yet exhausted and there are free workers:
%% push the next value to the worker as long as there are workers and values.
yield_next(push(Pool));
yield_next(#state{state = normal, free = [], busy = Busy, recv_timeout = Timeout} = Pool) ->
%% All workers are busy, wait for the next result.
{Result, Pid, Busy1} = recv(Busy, Timeout),
Pool1 = push(Pool#state{
free = [Pid],
busy = Busy1
}),
{Result, Pool1};
yield_next(#state{state = final, busy = Busy, recv_timeout = Timeout} = Pool) ->
%% The inner iterator is exhausted, wait for the remaining results.
case recv(Busy, Timeout) of
{Result, Pid, Busy1} ->
true = unlink(Pid),
exit(Pid, shutdown),
{Result, Pool#state{busy = Busy1}};
timeout ->
shutdown(Pool),
error(timeout);
empty ->
done
end.

%%
%% Pool API
%%

push(#state{state = normal, free = [Pid | Free], busy = Busy, inner_i = I} = Pool) ->
case iterator:next(I) of
{ok, Val, I1} ->
Pid ! {?MODULE, self(), {next, Val}},
push(Pool#state{
free = Free,
busy = queue:in(Pid, Busy),
inner_i = I1
});
done ->
Pool#state{state = final, inner_i = undefined}
end;
push(#state{state = normal, free = []} = Pool) ->
Pool.

recv(Busy, Timeout) ->
case queue:out(Busy) of
{{value, Pid}, Busy1} ->
receive
{?MODULE, Pid, {result, Result}} ->
{Result, Pid, Busy1}
after Timeout ->
timeout
end;
{empty, _} ->
empty
end.

%%
%% Pool management
%%

launch(F, Concurrency) ->
[
spawn_link(?MODULE, loop, [self(), F])
|| _ <- lists:seq(1, Concurrency)
].

shutdown(#state{free = Free, busy = Busy}) ->
Pids = Free ++ queue:to_list(Busy),
lists:foreach(
fun(Pid) ->
unlink(Pid),
exit(Pid, shutdown)
end,
Pids
),
%% Flush potential `result' messages from workers
[
receive
{?MODULE, Pid, {result, _}} -> ok
after 0 ->
ok
end
|| Pid <- Pids
],
ok.

loop(Parent, F) ->
receive
{?MODULE, Parent, {next, I}} ->
Parent ! {?MODULE, self(), {result, F(I)}},
?MODULE:loop(Parent, F)
end.
136 changes: 136 additions & 0 deletions test/prop_pmap.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
%% @doc Property-based tests for pmap
-module(prop_pmap).

%% Tests
-export([
prop_order_is_preserved/0,
prop_vs_lists/0,
prop_interrupted/0
]).

-include_lib("proper/include/proper.hrl").
-include_lib("stdlib/include/assert.hrl").

%% @doc Test that the order of the input list is preserved
prop_order_is_preserved() ->
Gen = {
proper_types:list(),
proper_types:oneof([
infinity,
proper_types:integer(1000, 10000)
]),
proper_types:pos_integer()
},
?FORALL(
{List, T, C},
Gen,
begin
ListIter = iterator:from_list(List),
Links0 = links(),
PmapIter =
iterator_pmap:pmap(
fun(X) -> X end,
ListIter,
#{
recv_timeout => T,
concurrency => C
}
),
?assertEqual(List, iterator:to_list(PmapIter)),
?assertEqual([], iterator_pmap:flush()),
assert_links(Links0),
true
end
).

%% @doc The outcome of pmap is the same as of `lists:map/2'
prop_vs_lists() ->
Gen = {
proper_types:list(proper_types:integer()),
proper_types:oneof([
infinity,
proper_types:integer(1000, 10000)
]),
proper_types:pos_integer()
},
?FORALL(
{List, T, C},
Gen,
begin
F = fun(X) -> X + 1 end,
ListIter = iterator:from_list(List),
Links0 = links(),
PmapIter =
iterator_pmap:pmap(
F,
ListIter,
#{
recv_timeout => T,
concurrency => C
}
),
?assertEqual(
lists:map(F, List),
iterator:to_list(PmapIter)
),
?assertEqual([], iterator_pmap:flush()),
assert_links(Links0),
true
end
).

%% @doc Make sure workers are cleaned if pmap iterator is not fully consumed
prop_interrupted() ->
Gen0 = {
proper_types:non_empty(proper_types:list(proper_types:integer())),
proper_types:pos_integer()
},
Gen = ?LET(
{List, C},
Gen0,
{
List,
C,
proper_types:integer(1, length(List))
}
),
?FORALL(
{List, C, TakeN},
Gen,
begin
F = fun(X) -> X + 1 end,
ListIter = iterator:from_list(List),
Links0 = links(),
PmapIter =
iterator_pmap:pmap(
F,
ListIter,
#{concurrency => C}
),
FirstNIter = iterator:sublist(PmapIter, TakeN),
?assertEqual(
lists:sublist(lists:map(F, List), TakeN),
iterator:to_list(FirstNIter)
),
?assertEqual([], iterator_pmap:flush()),
assert_links(Links0),
true
end
).

links() ->
{links, L} = process_info(self(), links),
lists:sort(L).

assert_links(Links0) ->
Links = links(),
?assertEqual(
Links0,
Links,
[
{extra, [
{P, erlang:process_info(P)}
|| P <- ordsets:subtract(Links, Links0)
]}
]
).

0 comments on commit 0201b9b

Please sign in to comment.