Skip to content

Commit

Permalink
Merge pull request #255 from rabbitmq/md-has-projection/2-and-effect-…
Browse files Browse the repository at this point in the history
…free-register

Use effects to initialize new projections
  • Loading branch information
dumbbell authored Mar 27, 2024
2 parents 2151d8a + 06712a1 commit e3b7b7b
Show file tree
Hide file tree
Showing 6 changed files with 219 additions and 23 deletions.
80 changes: 80 additions & 0 deletions src/khepri.erl
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@
register_projection/2, register_projection/3, register_projection/4,
unregister_projection/1, unregister_projection/2,
unregister_projection/3,
has_projection/1, has_projection/2, has_projection/3,

%% Transactions; `khepri_tx' provides the API to use inside
%% transaction functions.
Expand Down Expand Up @@ -3026,6 +3027,85 @@ unregister_projection(StoreId, ProjectionName, Options)
is_map(Options) ->
khepri_machine:unregister_projection(StoreId, ProjectionName, Options).

%% -------------------------------------------------------------------
%% has_projection().
%% -------------------------------------------------------------------

-spec has_projection(ProjectionName) -> Ret when
ProjectionName :: atom(),
Ret :: boolean() | khepri:error().
%% @doc Determines whether the store has a projection registered with the given
%% name.
%%
%% Calling this function is the same as calling
%% `has_projection(StoreId, ProjectionName)' with the default store ID
%% (see {@link khepri_cluster:get_default_store_id/0}).
%%
%% @see has_projection/2.

has_projection(ProjectionName) when is_atom(ProjectionName) ->
StoreId = khepri_cluster:get_default_store_id(),
has_projection(StoreId, ProjectionName).

-spec has_projection
(StoreId, ProjectionName) -> Ret when
StoreId :: khepri:store_id(),
ProjectionName :: atom(),
Ret :: boolean() | khepri:error();
(ProjectionName, Options) -> Ret when
ProjectionName :: atom(),
Options :: khepri:query_options(),
Ret :: boolean() | khepri:error().
%% @doc Determines whether the store has a projection registered with the given
%% name.
%%
%% This function accepts the following two forms:
%% <ul>
%% <li>`has_projection(StoreId, ProjectionName)'. Calling it is the same
%% as calling `has_projection(StoreId, ProjectionName, #{})'.</li>
%% <li>`has_projection(ProjectionName, Options)'. Calling it is the same
%% as calling `has_projection(StoreId, ProjectionName, Options)' with
%% the default store ID (see {@link khepri_cluster:get_default_store_id/0}).
%% </li>
%% </ul>
%%
%% @see has_projection/3.

has_projection(StoreId, ProjectionName)
when ?IS_KHEPRI_STORE_ID(StoreId) andalso is_atom(ProjectionName) ->
has_projection(StoreId, ProjectionName, #{});
has_projection(ProjectionName, Options)
when is_atom(ProjectionName) andalso is_map(Options) ->
StoreId = khepri_cluster:get_default_store_id(),
has_projection(StoreId, ProjectionName, Options).

-spec has_projection(StoreId, ProjectionName, Options) ->
Ret when
StoreId :: khepri:store_id(),
ProjectionName :: atom(),
Options :: khepri:query_options(),
Ret :: boolean() | khepri:error().
%% @doc Determines whether the store has a projection registered with the given
%% name.
%%
%% @param StoreId the name of the Khepri store.
%% @param ProjectionName the name of the projection to has as passed to
%% {@link khepri_projection:new/3}.
%% @param Options query options.
%% @returns `true' if the store contains a projection registered with the given
%% name, `false' if it does not, or an `{error, Reason}' tuple if the query
%% failed.

has_projection(StoreId, ProjectionName, Options)
when ?IS_KHEPRI_STORE_ID(StoreId) andalso is_atom(ProjectionName) andalso
is_map(Options) ->
case khepri_machine:get_projections_state(StoreId, Options) of
{ok, ProjectionTree} ->
khepri_machine:has_projection(ProjectionTree, ProjectionName);
{error, _} = Error ->
Error
end.

%% -------------------------------------------------------------------
%% transaction().
%% -------------------------------------------------------------------
Expand Down
90 changes: 69 additions & 21 deletions src/khepri_machine.erl
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
get_triggers/1,
get_emitted_triggers/1,
get_projections/1,
has_projection/2,
get_metrics/1]).
-ifdef(TEST).
-export([make_virgin_state/1]).
Expand Down Expand Up @@ -1121,6 +1122,13 @@ handle_aux(
Projection <- Projections]
end),
{no_reply, AuxState, LogState};
handle_aux(
_RaState, cast,
#restore_projection{projection = Projection, pattern = PathPattern},
AuxState, LogState, State) ->
Tree = get_tree(State),
ok = restore_projection(Projection, Tree, PathPattern),
{no_reply, AuxState, LogState};
handle_aux(_RaState, _Type, _Command, AuxState, LogState, _MachineState) ->
{no_reply, AuxState, LogState}.

Expand Down Expand Up @@ -1207,27 +1215,34 @@ apply(
Meta,
#register_projection{pattern = PathPattern, projection = Projection},
State) ->
Tree = get_tree(State),
ProjectionName = khepri_projection:name(Projection),
ProjectionTree = get_projections(State),
Reply = khepri_projection:init(Projection),
State1 = case Reply of
ok ->
restore_projection(Projection, Tree, PathPattern),
ProjectionTree1 = khepri_pattern_tree:update(
ProjectionTree,
PathPattern,
fun (?NO_PAYLOAD) ->
[Projection];
(Projections) ->
[Projection | Projections]
end),
erase(compiled_projection_tree),
set_projections(State, ProjectionTree1);
_ ->
State
end,
Ret = {State1, Reply},
bump_applied_command_count(Ret, Meta);
case has_projection(ProjectionTree, ProjectionName) of
true ->
Info = #{name => ProjectionName},
Reason = ?khepri_error(projection_already_exists, Info),
Reply = {error, Reason},
Ret = {State, Reply},
bump_applied_command_count(Ret, Meta);
false ->
ProjectionTree1 = khepri_pattern_tree:update(
ProjectionTree,
PathPattern,
fun (?NO_PAYLOAD) ->
[Projection];
(Projections) ->
[Projection | Projections]
end),
%% The new projection has been registered so the cached compiled
%% projection tree needs to be erased.
clear_compiled_projection_tree(),
State1 = set_projections(State, ProjectionTree1),
AuxEffect = #restore_projection{projection = Projection,
pattern = PathPattern},
Effects = [{aux, AuxEffect}],
Ret = {State1, ok, Effects},
bump_applied_command_count(Ret, Meta)
end;
apply(
Meta,
#unregister_projection{name = ProjectionName},
Expand Down Expand Up @@ -1259,7 +1274,7 @@ apply(
Reason = ?khepri_error(projection_not_found, Info),
{error, Reason};
_ ->
erase(compiled_projection_tree),
clear_compiled_projection_tree(),
ok
end,
State1 = set_projections(State, ProjectionTree1),
Expand Down Expand Up @@ -1766,6 +1781,20 @@ get_compiled_projection_tree(SourceProjectionTree) ->
CompiledProjectionTree
end.

-spec clear_compiled_projection_tree() -> ok.
%% @doc Erases the cached projection tree.
%%
%% This function should be called whenever the projection tree is changed:
%% whenever a projection is registered or unregistered.
%%
%% @see get_compiled_projection_tree/1.
%%
%% @private

clear_compiled_projection_tree() ->
erase(compiled_projection_tree),
ok.

%% -------------------------------------------------------------------
%% State record management functions.
%% -------------------------------------------------------------------
Expand Down Expand Up @@ -1907,6 +1936,25 @@ set_emitted_triggers(#khepri_machine{} = State, EmittedTriggers) ->
get_projections(#khepri_machine{projections = Projections}) ->
Projections.

-spec has_projection(ProjectionTree, ProjectionName) -> boolean() when
ProjectionTree :: khepri_machine:projection_tree(),
ProjectionName :: atom().
%% @doc Determines if the given projection tree contains a projection.
%%
%% Two projections are considered equal if they have the same name.
%%
%% @private

has_projection(ProjectionTree, Name) when is_atom(Name) ->
khepri_pattern_tree:any(
ProjectionTree,
fun(Projections) ->
lists:any(
fun(#khepri_projection{name = N}) ->
N =:= Name
end, Projections)
end).

-spec set_projections(State, Projections) -> NewState when
State :: khepri_machine:state(),
Projections :: khepri_machine:projection_tree(),
Expand Down
3 changes: 3 additions & 0 deletions src/khepri_machine.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,6 @@
old_props :: khepri:node_props(),
new_props :: khepri:node_props(),
projection :: khepri_projection:projection()}).

-record(restore_projection, {pattern :: khepri_path:native_pattern(),
projection :: khepri_projection:projection()}).
23 changes: 22 additions & 1 deletion src/khepri_pattern_tree.erl
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@

-type update_fun(Payload) :: fun((Payload) -> Payload).

-type find_fun(Payload) :: fun((Payload) -> boolean()).

-export_type([tree_node/1,
tree/1]).

Expand All @@ -51,7 +53,8 @@
fold/5,
foreach/2,
compile/1,
filtermap/2]).
filtermap/2,
any/2]).

-spec empty() -> TreeNode when
TreeNode :: khepri_pattern_tree:tree(Payload),
Expand Down Expand Up @@ -331,3 +334,21 @@ filtermap1(
end
end, ChildNodes0),
#pattern_node{payload = Payload, child_nodes = ChildNodes}.

-spec any(PatternTree, FindFun) -> Ret when
PatternTree :: khepri_pattern_tree:tree(Payload),
FindFun :: find_fun(Payload),
Ret :: payload() | undefined,
Payload :: payload().
%% @doc Determines whether the pattern tree contains a tree node with a payload
%% that matches the given predicate.
%%
%% @param PatternTree the tree to search.
%% @param FindFun the predicate with which to evaluate tree nodes.
%% @returns `true' if the predicate evaluates as `true' for any payload,
%% `false' otherwise.

any(#pattern_node{payload = Payload, child_nodes = Children}, FindFun) ->
(Payload =/= ?NO_PAYLOAD andalso FindFun(Payload)) orelse
lists:any(
fun(Child) -> any(Child, FindFun) end, maps:values(Children)).
33 changes: 33 additions & 0 deletions test/cluster_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1409,6 +1409,39 @@ can_use_default_store_on_single_node(_Config) ->
trigger_id,
[foo],
[sproc], #{})),

ProjectionName1 = projection1,
?assertEqual(false, khepri:has_projection(ProjectionName1)),
?assertEqual(
false,
khepri:has_projection(ProjectionName1, #{favor => consistency})),
Projection1 = khepri_projection:new(ProjectionName1, copy),
?assertEqual(ok, khepri:register_projection("/**", Projection1)),
?assertEqual(
{error, ?khepri_error(
projection_already_exists,
#{name => ProjectionName1})},
khepri:register_projection("/**", Projection1)),
?assertEqual(
true,
khepri:has_projection(ProjectionName1, #{favor => consistency})),

ProjectionName2 = projection2,
?assertEqual(false, khepri:has_projection(ProjectionName2)),
Projection2 = khepri_projection:new(
ProjectionName2, copy,
#{read_concurrency => true, keypos => 2}),
?assertEqual(ok, khepri:register_projection("/**", Projection2, #{})),
?assertEqual(true, khepri:has_projection(ProjectionName2)),

?assertEqual(ok, khepri:unregister_projection(ProjectionName1)),
?assertEqual(
{error, ?khepri_error(
projection_not_found,
#{name => ProjectionName1})},
khepri:unregister_projection(ProjectionName1)),
?assertEqual(ok, khepri:unregister_projection(ProjectionName2, #{})),

?assertEqual({ok, ok}, khepri:transaction(fun() -> ok end)),
?assertEqual({ok, ok}, khepri:transaction(fun() -> ok end, ro)),
?assertEqual({ok, ok}, khepri:transaction(fun() -> ok end, ro, #{})),
Expand Down
13 changes: 12 additions & 1 deletion test/projections.erl
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ trigger_simple_projection_on_path_test_() ->
?FUNCTION_NAME, PathPattern, Projection))
end)},

{"The store contains the projection",
?_assertEqual(
true,
khepri:has_projection(?FUNCTION_NAME, ?MODULE))},

{"Trigger the projection",
?_assertEqual(
ok,
Expand Down Expand Up @@ -283,7 +288,8 @@ duplicate_registrations_give_an_error_test_() ->
begin
Projection = khepri_projection:new(?MODULE, ProjectFun),
?assertEqual(
{error, exists},
{error, ?khepri_error(projection_already_exists,
#{name => ?MODULE})},
khepri:register_projection(
?FUNCTION_NAME, PathPattern, Projection))
end)}]
Expand Down Expand Up @@ -619,6 +625,11 @@ unregister_projection_test_() ->
ok,
khepri:unregister_projection(?FUNCTION_NAME, ?MODULE))},

{"The store no longer contains the projection",
?_assertEqual(
false,
khepri:has_projection(?FUNCTION_NAME, ?MODULE))},

{"The projection table no longer exists",
?_assertEqual(undefined, ets:info(?MODULE))},

Expand Down

0 comments on commit e3b7b7b

Please sign in to comment.