Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use effects to initialize new projections #255

Merged
merged 3 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions src/khepri.erl
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@
register_projection/2, register_projection/3, register_projection/4,
unregister_projection/1, unregister_projection/2,
unregister_projection/3,
has_projection/1, has_projection/2, has_projection/3,

%% Transactions; `khepri_tx' provides the API to use inside
%% transaction functions.
Expand Down Expand Up @@ -3026,6 +3027,85 @@ unregister_projection(StoreId, ProjectionName, Options)
is_map(Options) ->
khepri_machine:unregister_projection(StoreId, ProjectionName, Options).

%% -------------------------------------------------------------------
%% has_projection().
%% -------------------------------------------------------------------

-spec has_projection(ProjectionName) -> Ret when
ProjectionName :: atom(),
Ret :: boolean() | khepri:error().
%% @doc Determines whether the store has a projection registered with the given
%% name.
%%
%% Calling this function is the same as calling
%% `has_projection(StoreId, ProjectionName)' with the default store ID
%% (see {@link khepri_cluster:get_default_store_id/0}).
%%
%% @see has_projection/2.

has_projection(ProjectionName) when is_atom(ProjectionName) ->
StoreId = khepri_cluster:get_default_store_id(),
has_projection(StoreId, ProjectionName).

-spec has_projection
(StoreId, ProjectionName) -> Ret when
StoreId :: khepri:store_id(),
ProjectionName :: atom(),
Ret :: boolean() | khepri:error();
(ProjectionName, Options) -> Ret when
ProjectionName :: atom(),
Options :: khepri:query_options(),
Ret :: boolean() | khepri:error().
%% @doc Determines whether the store has a projection registered with the given
%% name.
%%
%% This function accepts the following two forms:
%% <ul>
%% <li>`has_projection(StoreId, ProjectionName)'. Calling it is the same
%% as calling `has_projection(StoreId, ProjectionName, #{})'.</li>
%% <li>`has_projection(ProjectionName, Options)'. Calling it is the same
%% as calling `has_projection(StoreId, ProjectionName, Options)' with
%% the default store ID (see {@link khepri_cluster:get_default_store_id/0}).
%% </li>
%% </ul>
%%
%% @see has_projection/3.

has_projection(StoreId, ProjectionName)
when ?IS_KHEPRI_STORE_ID(StoreId) andalso is_atom(ProjectionName) ->
has_projection(StoreId, ProjectionName, #{});
has_projection(ProjectionName, Options)
when is_atom(ProjectionName) andalso is_map(Options) ->
StoreId = khepri_cluster:get_default_store_id(),
has_projection(StoreId, ProjectionName, Options).

-spec has_projection(StoreId, ProjectionName, Options) ->
Ret when
StoreId :: khepri:store_id(),
ProjectionName :: atom(),
Options :: khepri:query_options(),
Ret :: boolean() | khepri:error().
%% @doc Determines whether the store has a projection registered with the given
%% name.
%%
%% @param StoreId the name of the Khepri store.
%% @param ProjectionName the name of the projection to has as passed to
%% {@link khepri_projection:new/3}.
%% @param Options query options.
%% @returns `true' if the store contains a projection registered with the given
%% name, `false' if it does not, or an `{error, Reason}' tuple if the query
%% failed.

has_projection(StoreId, ProjectionName, Options)
when ?IS_KHEPRI_STORE_ID(StoreId) andalso is_atom(ProjectionName) andalso
is_map(Options) ->
case khepri_machine:get_projections_state(StoreId, Options) of
{ok, ProjectionTree} ->
khepri_machine:has_projection(ProjectionTree, ProjectionName);
{error, _} = Error ->
Error
end.

%% -------------------------------------------------------------------
%% transaction().
%% -------------------------------------------------------------------
Expand Down
90 changes: 69 additions & 21 deletions src/khepri_machine.erl
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
get_triggers/1,
get_emitted_triggers/1,
get_projections/1,
has_projection/2,
get_metrics/1]).
-ifdef(TEST).
-export([make_virgin_state/1]).
Expand Down Expand Up @@ -1121,6 +1122,13 @@ handle_aux(
Projection <- Projections]
end),
{no_reply, AuxState, LogState};
handle_aux(
_RaState, cast,
#restore_projection{projection = Projection, pattern = PathPattern},
AuxState, LogState, State) ->
Tree = get_tree(State),
ok = restore_projection(Projection, Tree, PathPattern),
{no_reply, AuxState, LogState};
handle_aux(_RaState, _Type, _Command, AuxState, LogState, _MachineState) ->
{no_reply, AuxState, LogState}.

Expand Down Expand Up @@ -1207,27 +1215,34 @@ apply(
Meta,
#register_projection{pattern = PathPattern, projection = Projection},
State) ->
Tree = get_tree(State),
ProjectionName = khepri_projection:name(Projection),
ProjectionTree = get_projections(State),
Reply = khepri_projection:init(Projection),
State1 = case Reply of
ok ->
restore_projection(Projection, Tree, PathPattern),
ProjectionTree1 = khepri_pattern_tree:update(
ProjectionTree,
PathPattern,
fun (?NO_PAYLOAD) ->
[Projection];
(Projections) ->
[Projection | Projections]
end),
erase(compiled_projection_tree),
set_projections(State, ProjectionTree1);
_ ->
State
end,
Ret = {State1, Reply},
bump_applied_command_count(Ret, Meta);
case has_projection(ProjectionTree, ProjectionName) of
true ->
Info = #{name => ProjectionName},
Reason = ?khepri_error(projection_already_exists, Info),
Reply = {error, Reason},
Ret = {State, Reply},
bump_applied_command_count(Ret, Meta);
false ->
ProjectionTree1 = khepri_pattern_tree:update(
ProjectionTree,
PathPattern,
fun (?NO_PAYLOAD) ->
[Projection];
(Projections) ->
[Projection | Projections]
end),
%% The new projection has been registered so the cached compiled
%% projection tree needs to be erased.
clear_compiled_projection_tree(),
State1 = set_projections(State, ProjectionTree1),
AuxEffect = #restore_projection{projection = Projection,
pattern = PathPattern},
Effects = [{aux, AuxEffect}],
Ret = {State1, ok, Effects},
bump_applied_command_count(Ret, Meta)
end;
apply(
Meta,
#unregister_projection{name = ProjectionName},
Expand Down Expand Up @@ -1259,7 +1274,7 @@ apply(
Reason = ?khepri_error(projection_not_found, Info),
{error, Reason};
_ ->
erase(compiled_projection_tree),
clear_compiled_projection_tree(),
ok
end,
State1 = set_projections(State, ProjectionTree1),
Expand Down Expand Up @@ -1766,6 +1781,20 @@ get_compiled_projection_tree(SourceProjectionTree) ->
CompiledProjectionTree
end.

-spec clear_compiled_projection_tree() -> ok.
%% @doc Erases the cached projection tree.
%%
%% This function should be called whenever the projection tree is changed:
%% whenever a projection is registered or unregistered.
%%
%% @see get_compiled_projection_tree/1.
%%
%% @private

clear_compiled_projection_tree() ->
erase(compiled_projection_tree),
ok.

%% -------------------------------------------------------------------
%% State record management functions.
%% -------------------------------------------------------------------
Expand Down Expand Up @@ -1907,6 +1936,25 @@ set_emitted_triggers(#khepri_machine{} = State, EmittedTriggers) ->
get_projections(#khepri_machine{projections = Projections}) ->
Projections.

-spec has_projection(ProjectionTree, ProjectionName) -> boolean() when
ProjectionTree :: khepri_machine:projection_tree(),
ProjectionName :: atom().
%% @doc Determines if the given projection tree contains a projection.
%%
%% Two projections are considered equal if they have the same name.
%%
%% @private

has_projection(ProjectionTree, Name) when is_atom(Name) ->
khepri_pattern_tree:any(
ProjectionTree,
fun(Projections) ->
lists:any(
fun(#khepri_projection{name = N}) ->
N =:= Name
end, Projections)
end).

-spec set_projections(State, Projections) -> NewState when
State :: khepri_machine:state(),
Projections :: khepri_machine:projection_tree(),
Expand Down
3 changes: 3 additions & 0 deletions src/khepri_machine.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,6 @@
old_props :: khepri:node_props(),
new_props :: khepri:node_props(),
projection :: khepri_projection:projection()}).

-record(restore_projection, {pattern :: khepri_path:native_pattern(),
projection :: khepri_projection:projection()}).
23 changes: 22 additions & 1 deletion src/khepri_pattern_tree.erl
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@

-type update_fun(Payload) :: fun((Payload) -> Payload).

-type find_fun(Payload) :: fun((Payload) -> boolean()).

-export_type([tree_node/1,
tree/1]).

Expand All @@ -51,7 +53,8 @@
fold/5,
foreach/2,
compile/1,
filtermap/2]).
filtermap/2,
any/2]).

-spec empty() -> TreeNode when
TreeNode :: khepri_pattern_tree:tree(Payload),
Expand Down Expand Up @@ -331,3 +334,21 @@ filtermap1(
end
end, ChildNodes0),
#pattern_node{payload = Payload, child_nodes = ChildNodes}.

-spec any(PatternTree, FindFun) -> Ret when
PatternTree :: khepri_pattern_tree:tree(Payload),
FindFun :: find_fun(Payload),
Ret :: payload() | undefined,
Payload :: payload().
%% @doc Determines whether the pattern tree contains a tree node with a payload
%% that matches the given predicate.
%%
%% @param PatternTree the tree to search.
%% @param FindFun the predicate with which to evaluate tree nodes.
%% @returns `true' if the predicate evaluates as `true' for any payload,
%% `false' otherwise.

any(#pattern_node{payload = Payload, child_nodes = Children}, FindFun) ->
(Payload =/= ?NO_PAYLOAD andalso FindFun(Payload)) orelse
lists:any(
fun(Child) -> any(Child, FindFun) end, maps:values(Children)).
33 changes: 33 additions & 0 deletions test/cluster_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1409,6 +1409,39 @@ can_use_default_store_on_single_node(_Config) ->
trigger_id,
[foo],
[sproc], #{})),

ProjectionName1 = projection1,
?assertEqual(false, khepri:has_projection(ProjectionName1)),
?assertEqual(
false,
khepri:has_projection(ProjectionName1, #{favor => consistency})),
Projection1 = khepri_projection:new(ProjectionName1, copy),
?assertEqual(ok, khepri:register_projection("/**", Projection1)),
?assertEqual(
{error, ?khepri_error(
projection_already_exists,
#{name => ProjectionName1})},
khepri:register_projection("/**", Projection1)),
?assertEqual(
true,
khepri:has_projection(ProjectionName1, #{favor => consistency})),

ProjectionName2 = projection2,
?assertEqual(false, khepri:has_projection(ProjectionName2)),
Projection2 = khepri_projection:new(
ProjectionName2, copy,
#{read_concurrency => true, keypos => 2}),
?assertEqual(ok, khepri:register_projection("/**", Projection2, #{})),
?assertEqual(true, khepri:has_projection(ProjectionName2)),

?assertEqual(ok, khepri:unregister_projection(ProjectionName1)),
?assertEqual(
{error, ?khepri_error(
projection_not_found,
#{name => ProjectionName1})},
khepri:unregister_projection(ProjectionName1)),
?assertEqual(ok, khepri:unregister_projection(ProjectionName2, #{})),

?assertEqual({ok, ok}, khepri:transaction(fun() -> ok end)),
?assertEqual({ok, ok}, khepri:transaction(fun() -> ok end, ro)),
?assertEqual({ok, ok}, khepri:transaction(fun() -> ok end, ro, #{})),
Expand Down
13 changes: 12 additions & 1 deletion test/projections.erl
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ trigger_simple_projection_on_path_test_() ->
?FUNCTION_NAME, PathPattern, Projection))
end)},

{"The store contains the projection",
?_assertEqual(
true,
khepri:has_projection(?FUNCTION_NAME, ?MODULE))},

{"Trigger the projection",
?_assertEqual(
ok,
Expand Down Expand Up @@ -283,7 +288,8 @@ duplicate_registrations_give_an_error_test_() ->
begin
Projection = khepri_projection:new(?MODULE, ProjectFun),
?assertEqual(
{error, exists},
{error, ?khepri_error(projection_already_exists,
#{name => ?MODULE})},
khepri:register_projection(
?FUNCTION_NAME, PathPattern, Projection))
end)}]
Expand Down Expand Up @@ -619,6 +625,11 @@ unregister_projection_test_() ->
ok,
khepri:unregister_projection(?FUNCTION_NAME, ?MODULE))},

{"The store no longer contains the projection",
?_assertEqual(
false,
khepri:has_projection(?FUNCTION_NAME, ?MODULE))},

{"The projection table no longer exists",
?_assertEqual(undefined, ets:info(?MODULE))},

Expand Down
Loading