Skip to content

Commit

Permalink
Merge pull request #181 from NelsonVides/use_stored_worker_names
Browse files Browse the repository at this point in the history
Use stored worker names
  • Loading branch information
elbrujohalcon authored Nov 30, 2021
2 parents e8f74d3 + bb5c59d commit dcae5ca
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 15 deletions.
34 changes: 23 additions & 11 deletions src/wpool_pool.erl
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,18 @@

-export_type([wpool/0]).

-define(WPOOL_TABLE, ?MODULE).
-define(WPOOL_WORKERS, wpool_worker_names).

%% ===================================================================
%% API functions
%% ===================================================================
%% @doc Creates the ets table that will hold the information about active pools
-spec create_table() -> ok.
create_table() ->
_ = ets:new(?MODULE,
_ = ets:new(?WPOOL_TABLE,
[public, named_table, set, {read_concurrency, true}, {keypos, #wpool.name}]),
_ = ets:new(?WPOOL_WORKERS, [public, named_table, set, {read_concurrency, true}]),
ok.

%% @doc Starts a supervisor with several {@link wpool_process}es as its children
Expand Down Expand Up @@ -152,7 +156,8 @@ broadcast(Sup, Cast) ->

-spec all() -> [wpool:name()].
all() ->
[Name || #wpool{name = Name} <- ets:tab2list(?MODULE), find_wpool(Name) /= undefined].
[Name
|| #wpool{name = Name} <- ets:tab2list(?WPOOL_TABLE), find_wpool(Name) /= undefined].

%% @doc Retrieves the pool stats for all pools
-spec stats() -> [wpool:stats()].
Expand Down Expand Up @@ -231,11 +236,11 @@ task({_TaskId, Started, Task}) ->
%% @doc the number of workers in the pool
-spec wpool_size(atom()) -> non_neg_integer() | undefined.
wpool_size(Name) ->
try ets:update_counter(?MODULE, Name, {#wpool.size, 0}) of
try ets:update_counter(?WPOOL_TABLE, Name, {#wpool.size, 0}) of
WpoolSize ->
case erlang:whereis(Name) of
undefined ->
ets:delete(?MODULE, Name),
ets:delete(?WPOOL_TABLE, Name),
undefined;
_ ->
WpoolSize
Expand Down Expand Up @@ -361,6 +366,11 @@ init({Name, Options}) ->
%% @private
-spec worker_name(wpool:name(), pos_integer()) -> atom().
worker_name(Sup, I) ->
[{_, Worker}] = ets:lookup(?WPOOL_WORKERS, {Sup, I}),
Worker.

-spec build_worker_name(wpool:name(), pos_integer()) -> atom().
build_worker_name(Sup, I) ->
list_to_atom(?MODULE_STRING ++ [$- | atom_to_list(Sup)] ++ [$- | integer_to_list(I)]).

%% ===================================================================
Expand Down Expand Up @@ -435,20 +445,22 @@ all_workers(Wpool) ->
undefined ->
exit(no_workers);
_ ->
[wpool_pool:worker_name(Wpool, N) || N <- lists:seq(1, WPoolSize)]
[worker_name(Wpool, N) || N <- lists:seq(1, WPoolSize)]
end.

%% ===================================================================
%% ETS functions
%% ===================================================================
store_wpool(Wpool) ->
true = ets:insert(?MODULE, Wpool),
store_wpool(Wpool = #wpool{name = Name, size = Size}) ->
true = ets:insert(?WPOOL_TABLE, Wpool),
[ets:insert(?WPOOL_WORKERS, {{Name, I}, build_worker_name(Name, I)})
|| I <- lists:seq(1, Size)],
Wpool.

move_wpool(Name) ->
try
WpoolSize = ets:update_counter(?MODULE, Name, {#wpool.size, 0}),
ets:update_counter(?MODULE, Name, {#wpool.next, 1, WpoolSize, 1})
WpoolSize = ets:update_counter(?WPOOL_TABLE, Name, {#wpool.size, 0}),
ets:update_counter(?WPOOL_TABLE, Name, {#wpool.next, 1, WpoolSize, 1})
catch
_:badarg ->
case build_wpool(Name) of
Expand All @@ -462,11 +474,11 @@ move_wpool(Name) ->
%% @doc Use this function to get the Worker pool record in a custom worker.
-spec find_wpool(atom()) -> undefined | wpool().
find_wpool(Name) ->
try ets:lookup(?MODULE, Name) of
try ets:lookup(?WPOOL_TABLE, Name) of
[Wpool | _] ->
case erlang:whereis(Name) of
undefined ->
ets:delete(?MODULE, Name),
ets:delete(?WPOOL_TABLE, Name),
undefined;
_ ->
Wpool
Expand Down
13 changes: 9 additions & 4 deletions test/wpool_pool_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -470,24 +470,24 @@ ets_mess_up(_Config) ->
Pool = ets_mess_up,

ct:comment("Mess up with ets table..."),
true = ets:delete(wpool_pool, Pool),
ets_deletes(Pool),

ct:comment("Rebuild stats"),
1 = proplists:get_value(next_worker, wpool:stats(Pool)),

ct:comment("Mess up with ets table again..."),
true = ets:delete(wpool_pool, Pool),
ets_deletes(Pool),
{ok, ok} = wpool:call(Pool, {io, format, ["1!~n"]}, random_worker),

ct:comment("Mess up with ets table once more..."),
{ok, ok} = wpool:call(Pool, {io, format, ["2!~n"]}, next_worker),
2 = proplists:get_value(next_worker, wpool:stats(Pool)),
true = ets:delete(wpool_pool, Pool),
ets_deletes(Pool),
{ok, ok} = wpool:call(Pool, {io, format, ["3!~n"]}, next_worker),
1 = proplists:get_value(next_worker, wpool:stats(Pool)),

ct:comment("Mess up with ets table one final time..."),
true = ets:delete(wpool_pool, Pool),
ets_deletes(Pool),
_ = wpool_pool:find_wpool(Pool),

ct:comment("Now, delete the pool"),
Expand All @@ -509,6 +509,7 @@ ets_mess_up(_Config) ->

ct:comment("And now delete the ets table altogether"),
true = ets:delete(wpool_pool),
true = ets:delete(wpool_worker_names),
_ = wpool_pool:find_wpool(Pool),

wpool:stop(),
Expand Down Expand Up @@ -545,3 +546,7 @@ send_io_format(Pool) ->
worker_msg_queue_lengths(Pool) ->
lists:usort([proplists:get_value(message_queue_len, WS)
|| {_, WS} <- proplists:get_value(workers, wpool:stats(Pool))]).

ets_deletes(Pool) ->
true = ets:delete(wpool_pool, Pool),
[ets:delete(wpool_worker_names, {Pool, I}) || I <- lists:seq(1, ?WORKERS)].

0 comments on commit dcae5ca

Please sign in to comment.