diff --git a/src/khepri.erl b/src/khepri.erl index 471af623..607ba447 100644 --- a/src/khepri.erl +++ b/src/khepri.erl @@ -119,6 +119,7 @@ register_projection/2, register_projection/3, register_projection/4, unregister_projection/1, unregister_projection/2, unregister_projection/3, + has_projection/1, has_projection/2, has_projection/3, %% Transactions; `khepri_tx' provides the API to use inside %% transaction functions. @@ -3026,6 +3027,85 @@ unregister_projection(StoreId, ProjectionName, Options) is_map(Options) -> khepri_machine:unregister_projection(StoreId, ProjectionName, Options). +%% ------------------------------------------------------------------- +%% has_projection(). +%% ------------------------------------------------------------------- + +-spec has_projection(ProjectionName) -> Ret when + ProjectionName :: atom(), + Ret :: boolean() | khepri:error(). +%% @doc Determines whether the store has a projection registered with the given +%% name. +%% +%% Calling this function is the same as calling +%% `has_projection(StoreId, ProjectionName)' with the default store ID +%% (see {@link khepri_cluster:get_default_store_id/0}). +%% +%% @see has_projection/2. + +has_projection(ProjectionName) when is_atom(ProjectionName) -> + StoreId = khepri_cluster:get_default_store_id(), + has_projection(StoreId, ProjectionName). + +-spec has_projection +(StoreId, ProjectionName) -> Ret when + StoreId :: khepri:store_id(), + ProjectionName :: atom(), + Ret :: boolean() | khepri:error(); +(ProjectionName, Options) -> Ret when + ProjectionName :: atom(), + Options :: khepri:query_options(), + Ret :: boolean() | khepri:error(). +%% @doc Determines whether the store has a projection registered with the given +%% name. +%% +%% This function accepts the following two forms: +%% +%% +%% @see has_projection/3. + +has_projection(StoreId, ProjectionName) + when ?IS_KHEPRI_STORE_ID(StoreId) andalso is_atom(ProjectionName) -> + has_projection(StoreId, ProjectionName, #{}); +has_projection(ProjectionName, Options) + when is_atom(ProjectionName) andalso is_map(Options) -> + StoreId = khepri_cluster:get_default_store_id(), + has_projection(StoreId, ProjectionName, Options). + +-spec has_projection(StoreId, ProjectionName, Options) -> + Ret when + StoreId :: khepri:store_id(), + ProjectionName :: atom(), + Options :: khepri:query_options(), + Ret :: boolean() | khepri:error(). +%% @doc Determines whether the store has a projection registered with the given +%% name. +%% +%% @param StoreId the name of the Khepri store. +%% @param ProjectionName the name of the projection to has as passed to +%% {@link khepri_projection:new/3}. +%% @param Options query options. +%% @returns `true' if the store contains a projection registered with the given +%% name, `false' if it does not, or an `{error, Reason}' tuple if the query +%% failed. + +has_projection(StoreId, ProjectionName, Options) + when ?IS_KHEPRI_STORE_ID(StoreId) andalso is_atom(ProjectionName) andalso + is_map(Options) -> + case khepri_machine:get_projections_state(StoreId, Options) of + {ok, ProjectionTree} -> + khepri_machine:has_projection(ProjectionTree, ProjectionName); + {error, _} = Error -> + Error + end. + %% ------------------------------------------------------------------- %% transaction(). %% ------------------------------------------------------------------- diff --git a/src/khepri_machine.erl b/src/khepri_machine.erl index 97c7a172..89bff323 100644 --- a/src/khepri_machine.erl +++ b/src/khepri_machine.erl @@ -76,6 +76,7 @@ get_triggers/1, get_emitted_triggers/1, get_projections/1, + has_projection/2, get_metrics/1]). -ifdef(TEST). -export([make_virgin_state/1]). @@ -1121,6 +1122,13 @@ handle_aux( Projection <- Projections] end), {no_reply, AuxState, LogState}; +handle_aux( + _RaState, cast, + #restore_projection{projection = Projection, pattern = PathPattern}, + AuxState, LogState, State) -> + Tree = get_tree(State), + ok = restore_projection(Projection, Tree, PathPattern), + {no_reply, AuxState, LogState}; handle_aux(_RaState, _Type, _Command, AuxState, LogState, _MachineState) -> {no_reply, AuxState, LogState}. @@ -1207,27 +1215,34 @@ apply( Meta, #register_projection{pattern = PathPattern, projection = Projection}, State) -> - Tree = get_tree(State), + ProjectionName = khepri_projection:name(Projection), ProjectionTree = get_projections(State), - Reply = khepri_projection:init(Projection), - State1 = case Reply of - ok -> - restore_projection(Projection, Tree, PathPattern), - ProjectionTree1 = khepri_pattern_tree:update( - ProjectionTree, - PathPattern, - fun (?NO_PAYLOAD) -> - [Projection]; - (Projections) -> - [Projection | Projections] - end), - erase(compiled_projection_tree), - set_projections(State, ProjectionTree1); - _ -> - State - end, - Ret = {State1, Reply}, - bump_applied_command_count(Ret, Meta); + case has_projection(ProjectionTree, ProjectionName) of + true -> + Info = #{name => ProjectionName}, + Reason = ?khepri_error(projection_already_exists, Info), + Reply = {error, Reason}, + Ret = {State, Reply}, + bump_applied_command_count(Ret, Meta); + false -> + ProjectionTree1 = khepri_pattern_tree:update( + ProjectionTree, + PathPattern, + fun (?NO_PAYLOAD) -> + [Projection]; + (Projections) -> + [Projection | Projections] + end), + %% The new projection has been registered so the cached compiled + %% projection tree needs to be erased. + clear_compiled_projection_tree(), + State1 = set_projections(State, ProjectionTree1), + AuxEffect = #restore_projection{projection = Projection, + pattern = PathPattern}, + Effects = [{aux, AuxEffect}], + Ret = {State1, ok, Effects}, + bump_applied_command_count(Ret, Meta) + end; apply( Meta, #unregister_projection{name = ProjectionName}, @@ -1259,7 +1274,7 @@ apply( Reason = ?khepri_error(projection_not_found, Info), {error, Reason}; _ -> - erase(compiled_projection_tree), + clear_compiled_projection_tree(), ok end, State1 = set_projections(State, ProjectionTree1), @@ -1766,6 +1781,20 @@ get_compiled_projection_tree(SourceProjectionTree) -> CompiledProjectionTree end. +-spec clear_compiled_projection_tree() -> ok. +%% @doc Erases the cached projection tree. +%% +%% This function should be called whenever the projection tree is changed: +%% whenever a projection is registered or unregistered. +%% +%% @see get_compiled_projection_tree/1. +%% +%% @private + +clear_compiled_projection_tree() -> + erase(compiled_projection_tree), + ok. + %% ------------------------------------------------------------------- %% State record management functions. %% ------------------------------------------------------------------- @@ -1907,6 +1936,25 @@ set_emitted_triggers(#khepri_machine{} = State, EmittedTriggers) -> get_projections(#khepri_machine{projections = Projections}) -> Projections. +-spec has_projection(ProjectionTree, ProjectionName) -> boolean() when + ProjectionTree :: khepri_machine:projection_tree(), + ProjectionName :: atom(). +%% @doc Determines if the given projection tree contains a projection. +%% +%% Two projections are considered equal if they have the same name. +%% +%% @private + +has_projection(ProjectionTree, Name) when is_atom(Name) -> + khepri_pattern_tree:any( + ProjectionTree, + fun(Projections) -> + lists:any( + fun(#khepri_projection{name = N}) -> + N =:= Name + end, Projections) + end). + -spec set_projections(State, Projections) -> NewState when State :: khepri_machine:state(), Projections :: khepri_machine:projection_tree(), diff --git a/src/khepri_machine.hrl b/src/khepri_machine.hrl index 42a68af6..590a3173 100644 --- a/src/khepri_machine.hrl +++ b/src/khepri_machine.hrl @@ -53,3 +53,6 @@ old_props :: khepri:node_props(), new_props :: khepri:node_props(), projection :: khepri_projection:projection()}). + +-record(restore_projection, {pattern :: khepri_path:native_pattern(), + projection :: khepri_projection:projection()}). diff --git a/src/khepri_pattern_tree.erl b/src/khepri_pattern_tree.erl index b2f2d7b3..23d2faa3 100644 --- a/src/khepri_pattern_tree.erl +++ b/src/khepri_pattern_tree.erl @@ -42,6 +42,8 @@ -type update_fun(Payload) :: fun((Payload) -> Payload). +-type find_fun(Payload) :: fun((Payload) -> boolean()). + -export_type([tree_node/1, tree/1]). @@ -51,7 +53,8 @@ fold/5, foreach/2, compile/1, - filtermap/2]). + filtermap/2, + any/2]). -spec empty() -> TreeNode when TreeNode :: khepri_pattern_tree:tree(Payload), @@ -331,3 +334,21 @@ filtermap1( end end, ChildNodes0), #pattern_node{payload = Payload, child_nodes = ChildNodes}. + +-spec any(PatternTree, FindFun) -> Ret when + PatternTree :: khepri_pattern_tree:tree(Payload), + FindFun :: find_fun(Payload), + Ret :: payload() | undefined, + Payload :: payload(). +%% @doc Determines whether the pattern tree contains a tree node with a payload +%% that matches the given predicate. +%% +%% @param PatternTree the tree to search. +%% @param FindFun the predicate with which to evaluate tree nodes. +%% @returns `true' if the predicate evaluates as `true' for any payload, +%% `false' otherwise. + +any(#pattern_node{payload = Payload, child_nodes = Children}, FindFun) -> + (Payload =/= ?NO_PAYLOAD andalso FindFun(Payload)) orelse + lists:any( + fun(Child) -> any(Child, FindFun) end, maps:values(Children)). diff --git a/test/cluster_SUITE.erl b/test/cluster_SUITE.erl index b76e9a3f..b7e15cab 100644 --- a/test/cluster_SUITE.erl +++ b/test/cluster_SUITE.erl @@ -1409,6 +1409,39 @@ can_use_default_store_on_single_node(_Config) -> trigger_id, [foo], [sproc], #{})), + + ProjectionName1 = projection1, + ?assertEqual(false, khepri:has_projection(ProjectionName1)), + ?assertEqual( + false, + khepri:has_projection(ProjectionName1, #{favor => consistency})), + Projection1 = khepri_projection:new(ProjectionName1, copy), + ?assertEqual(ok, khepri:register_projection("/**", Projection1)), + ?assertEqual( + {error, ?khepri_error( + projection_already_exists, + #{name => ProjectionName1})}, + khepri:register_projection("/**", Projection1)), + ?assertEqual( + true, + khepri:has_projection(ProjectionName1, #{favor => consistency})), + + ProjectionName2 = projection2, + ?assertEqual(false, khepri:has_projection(ProjectionName2)), + Projection2 = khepri_projection:new( + ProjectionName2, copy, + #{read_concurrency => true, keypos => 2}), + ?assertEqual(ok, khepri:register_projection("/**", Projection2, #{})), + ?assertEqual(true, khepri:has_projection(ProjectionName2)), + + ?assertEqual(ok, khepri:unregister_projection(ProjectionName1)), + ?assertEqual( + {error, ?khepri_error( + projection_not_found, + #{name => ProjectionName1})}, + khepri:unregister_projection(ProjectionName1)), + ?assertEqual(ok, khepri:unregister_projection(ProjectionName2, #{})), + ?assertEqual({ok, ok}, khepri:transaction(fun() -> ok end)), ?assertEqual({ok, ok}, khepri:transaction(fun() -> ok end, ro)), ?assertEqual({ok, ok}, khepri:transaction(fun() -> ok end, ro, #{})), diff --git a/test/projections.erl b/test/projections.erl index be60b2d9..ab1bf627 100644 --- a/test/projections.erl +++ b/test/projections.erl @@ -33,6 +33,11 @@ trigger_simple_projection_on_path_test_() -> ?FUNCTION_NAME, PathPattern, Projection)) end)}, + {"The store contains the projection", + ?_assertEqual( + true, + khepri:has_projection(?FUNCTION_NAME, ?MODULE))}, + {"Trigger the projection", ?_assertEqual( ok, @@ -283,7 +288,8 @@ duplicate_registrations_give_an_error_test_() -> begin Projection = khepri_projection:new(?MODULE, ProjectFun), ?assertEqual( - {error, exists}, + {error, ?khepri_error(projection_already_exists, + #{name => ?MODULE})}, khepri:register_projection( ?FUNCTION_NAME, PathPattern, Projection)) end)}] @@ -619,6 +625,11 @@ unregister_projection_test_() -> ok, khepri:unregister_projection(?FUNCTION_NAME, ?MODULE))}, + {"The store no longer contains the projection", + ?_assertEqual( + false, + khepri:has_projection(?FUNCTION_NAME, ?MODULE))}, + {"The projection table no longer exists", ?_assertEqual(undefined, ets:info(?MODULE))},