Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make #khepri_machine{} private #252

Merged
merged 1 commit into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions src/khepri_import_export.erl
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,9 @@ export(StoreId, PathPattern, Module, ModulePriv)
Ret :: ok | {ok, ModulePriv} | {error, any()}.
%% @private

do_export(
#khepri_machine{tree = Tree} = State,
PathPattern, Module, ModulePriv) ->
do_export(State, PathPattern, Module, ModulePriv) ->
%% Initialize export using the callback module.
Tree = khepri_machine:get_tree(State),
case open_write(Module, ModulePriv) of
{ok, ModulePriv1} ->
%% Prepare the traversal.
Expand Down Expand Up @@ -274,10 +273,8 @@ open_write(Module, ModulePriv) ->
Ret :: {ok, ModulePriv} | {error, any()}.
%% @private

write(
#khepri_machine{tree = #tree{keep_while_conds = KeepWhileConds}},
Path, #node{payload = Payload},
Module, ModulePriv) ->
write(State, Path, #node{payload = Payload}, Module, ModulePriv) ->
#tree{keep_while_conds = KeepWhileConds} = khepri_machine:get_tree(State),
PutOptions = case KeepWhileConds of
#{Path := KeepWhile} -> #{keep_while => KeepWhile};
_ -> #{}
Expand Down
97 changes: 90 additions & 7 deletions src/khepri_machine.erl
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,16 @@
process_query/3,
process_command/3]).

-ifdef(TEST).
-export([get_tree/1,
%% Functions to access the opaque #khepri_machine{} state.
-export([make_virgin_state/1,
is_state/1,
ensure_is_state/1,
get_tree/1,
get_root/1,
get_keep_while_conds/1,
get_keep_while_conds_revidx/1,
get_last_consistent_call_atomics/1]).
-endif.
get_last_consistent_call_atomics/1,
get_metrics/1]).

-compile({no_auto_import, [apply/3]}).

Expand Down Expand Up @@ -100,9 +103,27 @@
-type machine_config() :: #config{}.
%% Configuration record, holding read-only or rarely changing fields.

-type state() :: #?MODULE{}.
%% State machine's internal state record.
-record(khepri_machine,
{config = #config{} :: khepri_machine:machine_config(),
tree = #tree{} :: khepri_tree:tree(),
triggers = #{} :: khepri_machine:triggers_map(),
emitted_triggers = [] :: [khepri_machine:triggered()],
projections = khepri_pattern_tree:empty() ::
khepri_machine:projection_tree(),
metrics = #{} :: khepri_machine:metrics()}).

-opaque state() :: #?MODULE{}.
%% State of this Ra state machine.

-type triggers_map() :: #{khepri:trigger_id() =>
#{sproc := khepri_path:native_path(),
event_filter := khepri_evf:event_filter()}}.
%% Internal triggers map in the machine state.

-type metrics() :: #{applied_command_count => non_neg_integer()}.
%% Internal state machine metrics.

-type aux_state() :: #khepri_machine_aux{}.
%% Auxiliary state of this Ra state machine.

Expand All @@ -128,6 +149,8 @@

state/0,
machine_config/0,
triggers_map/0,
metrics/0,
props/0,
triggered/0,
projection_tree/0]).
Expand Down Expand Up @@ -1308,6 +1331,16 @@ emitted_triggers_to_side_effects(
emitted_triggers_to_side_effects(_State) ->
[].

-spec overview(State) -> Overview when
State :: khepri_machine:state(),
Overview :: #{store_id := StoreId,
tree := NodeTree,
triggers := Triggers,
keep_while_conds := KeepWhileConds},
StoreId :: khepri:store_id(),
NodeTree :: khepri_utils:display_tree(),
Triggers :: khepri_machine:triggers_map(),
KeepWhileConds :: khepri_tree:keep_while_conds_map().
%% @private

overview(#?MODULE{config = #config{store_id = StoreId},
Expand Down Expand Up @@ -1714,18 +1747,68 @@ get_compiled_projection_tree(SourceProjectionTree) ->
CompiledProjectionTree
end.

-ifdef(TEST).
-spec make_virgin_state(Config) -> State when
Config :: khepri_machine:machine_config(),
State :: khepri_machine:state().
%% @private

make_virgin_state(Config) ->
#?MODULE{config = Config}.

-spec is_state(State) -> IsState when
State :: khepri_machine:state(),
IsState :: boolean().
%% @private

is_state(State) ->
is_record(State, khepri_machine).

-spec ensure_is_state(State) -> ok when
State :: khepri_machine:state().
%% @private

ensure_is_state(State) ->
?assert(is_state(State)),
ok.

-spec get_tree(State) -> Tree when
State :: khepri_machine:state(),
Tree :: khepri_tree:tree().
%% @private

get_tree(#?MODULE{tree = Tree}) ->
Tree.

-spec get_root(State) -> Root when
State :: khepri_machine:state(),
Root :: khepri_tree:tree_node().
%% @private

get_root(#?MODULE{tree = #tree{root = Root}}) ->
Root.

-spec get_keep_while_conds(State) -> KeepWhileConds when
State :: khepri_machine:state(),
KeepWhileConds :: khepri_tree:keep_while_conds_map().
%% @private

get_keep_while_conds(
#?MODULE{tree = #tree{keep_while_conds = KeepWhileConds}}) ->
KeepWhileConds.

-spec get_keep_while_conds_revidx(State) -> KeepWhileCondsRevIdx when
State :: khepri_machine:state(),
KeepWhileCondsRevIdx :: khepri_tree:keep_while_conds_revidx().
%% @private

get_keep_while_conds_revidx(
#?MODULE{tree = #tree{keep_while_conds_revidx = KeepWhileCondsRevIdx}}) ->
KeepWhileCondsRevIdx.
-endif.

-spec get_metrics(State) -> Metrics when
State :: khepri_machine:state(),
Metrics :: khepri_machine:metrics().
%% @private

get_metrics(#?MODULE{metrics = Metrics}) ->
Metrics.
13 changes: 0 additions & 13 deletions src/khepri_machine.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,6 @@
member :: ra:server_id(),
snapshot_interval = ?SNAPSHOT_INTERVAL :: non_neg_integer()}).

%% State machine's internal state record.
-record(khepri_machine,
{config = #config{} :: khepri_machine:machine_config(),
tree = #tree{} :: khepri_tree:tree(),
triggers = #{} ::
#{khepri:trigger_id() =>
#{sproc := khepri_path:native_path(),
event_filter := khepri_evf:event_filter()}},
emitted_triggers = [] :: [khepri_machine:triggered()],
projections = khepri_pattern_tree:empty() ::
khepri_machine:projection_tree(),
metrics = #{} :: #{applied_command_count => non_neg_integer()}}).

-record(khepri_machine_aux,
{store_id :: khepri:store_id()}).

Expand Down
12 changes: 6 additions & 6 deletions src/khepri_tx.erl
Original file line number Diff line number Diff line change
Expand Up @@ -459,8 +459,8 @@ count(PathPattern) ->

count(PathPattern, Options) ->
PathPattern1 = khepri_tx_adv:path_from_string(PathPattern),
{#khepri_machine{tree = Tree},
_SideEffects} = khepri_tx_adv:get_tx_state(),
{State, _SideEffects} = khepri_tx_adv:get_tx_state(),
Tree = khepri_machine:get_tree(State),
Fun = fun khepri_tree:count_node_cb/3,
{_QueryOptions, TreeOptions} = khepri_machine:split_query_options(Options),
TreeOptions1 = TreeOptions#{expect_specific_node => false},
Expand Down Expand Up @@ -510,8 +510,8 @@ fold(PathPattern, Fun, Acc) ->

fold(PathPattern, Fun, Acc, Options) ->
PathPattern1 = khepri_tx_adv:path_from_string(PathPattern),
{#khepri_machine{tree = Tree},
_SideEffects} = khepri_tx_adv:get_tx_state(),
{State, _SideEffects} = khepri_tx_adv:get_tx_state(),
Tree = khepri_machine:get_tree(State),
{_QueryOptions, TreeOptions} = khepri_machine:split_query_options(Options),
TreeOptions1 = TreeOptions#{expect_specific_node => false},
Ret = khepri_tree:fold(Tree, PathPattern1, Fun, Acc, TreeOptions1),
Expand Down Expand Up @@ -996,6 +996,6 @@ abort(Reason) ->
is_transaction() ->
StateAndSideEffects = erlang:get(?TX_STATE_KEY),
case StateAndSideEffects of
{#khepri_machine{}, _SideEffects} -> true;
_ -> false
{_State, _SideEffects} -> true;
_ -> false
end.
15 changes: 9 additions & 6 deletions src/khepri_tx_adv.erl
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ get_many(PathPattern, Options) ->
do_get_many(PathPattern, Fun, Acc, Options) ->
PathPattern1 = path_from_string(PathPattern),
{_QueryOptions, TreeOptions} = khepri_machine:split_query_options(Options),
{#khepri_machine{tree = Tree}, _SideEffects} = get_tx_state(),
{State, _SideEffects} = get_tx_state(),
Tree = khepri_machine:get_tree(State),
Ret = khepri_tree:fold(Tree, PathPattern1, Fun, Acc, TreeOptions),
case Ret of
{error, ?khepri_exception(_, _) = Exception} ->
Expand Down Expand Up @@ -981,7 +982,7 @@ run(State, StandaloneFun, Args, AllowUpdates)

{NewState, NewSideEffects} = erlang:erase(?TX_STATE_KEY),
NewTxProps = erlang:erase(?TX_PROPS),
?assert(is_record(NewState, khepri_machine)),
?assert(khepri_machine:is_state(NewState)),
?assertEqual(TxProps, NewTxProps),
{NewState, Ret, NewSideEffects}
catch
Expand Down Expand Up @@ -1010,7 +1011,8 @@ handle_state_for_call(Fun) ->

get_tx_state() ->
case erlang:get(?TX_STATE_KEY) of
{#khepri_machine{}, _SideEffects} = StateAndSideEffects ->
{State, _SideEffects} = StateAndSideEffects ->
khepri_machine:ensure_is_state(State),
StateAndSideEffects;
undefined ->
?khepri_misuse(invalid_use_of_khepri_tx_outside_transaction, #{})
Expand All @@ -1021,9 +1023,10 @@ get_tx_state() ->
SideEffects :: ra_machine:effects().
%% @private

set_tx_state(#khepri_machine{} = NewState, SideEffects) ->
_ = erlang:put(?TX_STATE_KEY, {NewState, SideEffects}),
ok.
set_tx_state(NewState, SideEffects) ->
khepri_machine:ensure_is_state(NewState),
_ = erlang:put(?TX_STATE_KEY, {NewState, SideEffects}),
ok.

-spec get_tx_props() -> TxProps when
TxProps :: tx_props().
Expand Down
2 changes: 2 additions & 0 deletions src/khepri_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
child_nodes => #{khepri_path:node_id() =>
display_tree()}}.

-export_type([display_tree/0]).

-spec start_timeout_window(Timeout) -> Timestamp | none when
Timeout :: timeout(),
Timestamp :: integer().
Expand Down
8 changes: 4 additions & 4 deletions test/cluster_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1536,7 +1536,7 @@ can_set_snapshot_interval(Config) ->
#{},
khepri_machine:process_query(
StoreId,
fun(#khepri_machine{metrics = Metrics}) -> Metrics end,
fun khepri_machine:get_metrics/1,
#{})),

ct:pal("Use database after starting it"),
Expand All @@ -1547,7 +1547,7 @@ can_set_snapshot_interval(Config) ->
#{applied_command_count => 1},
khepri_machine:process_query(
StoreId,
fun(#khepri_machine{metrics = Metrics}) -> Metrics end,
fun khepri_machine:get_metrics/1,
#{})),

ct:pal("Use database after starting it"),
Expand All @@ -1558,7 +1558,7 @@ can_set_snapshot_interval(Config) ->
#{applied_command_count => 2},
khepri_machine:process_query(
StoreId,
fun(#khepri_machine{metrics = Metrics}) -> Metrics end,
fun khepri_machine:get_metrics/1,
#{})),

ct:pal("Use database after starting it"),
Expand All @@ -1569,7 +1569,7 @@ can_set_snapshot_interval(Config) ->
#{},
khepri_machine:process_query(
StoreId,
fun(#khepri_machine{metrics = Metrics}) -> Metrics end,
fun khepri_machine:get_metrics/1,
#{})),

ct:pal("Stop database"),
Expand Down
28 changes: 18 additions & 10 deletions test/delete_command.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ delete_non_existing_node_test() ->
{S1, Ret, SE} = khepri_machine:apply(?META, Command, S0),

?assertEqual(
S0#khepri_machine.tree#tree.root,
S1#khepri_machine.tree#tree.root),
?assertEqual(#{applied_command_count => 1}, S1#khepri_machine.metrics),
khepri_machine:get_root(S0),
khepri_machine:get_root(S1)),
?assertEqual(
#{applied_command_count => 1},
khepri_machine:get_metrics(S1)),
?assertEqual({ok, #{}}, Ret),
?assertEqual([], SE).

Expand All @@ -34,9 +36,11 @@ delete_non_existing_node_under_non_existing_parent_test() ->
{S1, Ret, SE} = khepri_machine:apply(?META, Command, S0),

?assertEqual(
S0#khepri_machine.tree#tree.root,
S1#khepri_machine.tree#tree.root),
?assertEqual(#{applied_command_count => 1}, S1#khepri_machine.metrics),
khepri_machine:get_root(S0),
khepri_machine:get_root(S1)),
?assertEqual(
#{applied_command_count => 1},
khepri_machine:get_metrics(S1)),
?assertEqual({ok, #{}}, Ret),
?assertEqual([], SE).

Expand Down Expand Up @@ -302,23 +306,27 @@ delete_command_bumps_applied_command_count_test() ->
snapshot_interval => 3,
commands => Commands}),

?assertEqual(#{}, S0#khepri_machine.metrics),
?assertEqual(#{}, khepri_machine:get_metrics(S0)),

Command1 = #delete{path = [bar]},
{S1, _, SE1} = khepri_machine:apply(?META, Command1, S0),

?assertEqual(#{applied_command_count => 1}, S1#khepri_machine.metrics),
?assertEqual(
#{applied_command_count => 1},
khepri_machine:get_metrics(S1)),
?assertEqual([], SE1),

Command2 = #delete{path = [baz]},
{S2, _, SE2} = khepri_machine:apply(?META, Command2, S1),

?assertEqual(#{applied_command_count => 2}, S2#khepri_machine.metrics),
?assertEqual(
#{applied_command_count => 2},
khepri_machine:get_metrics(S2)),
?assertEqual([], SE2),

Command3 = #delete{path = [qux]},
Meta = ?META,
{S3, _, SE3} = khepri_machine:apply(Meta, Command3, S2),

?assertEqual(#{}, S3#khepri_machine.metrics),
?assertEqual(#{}, khepri_machine:get_metrics(S3)),
?assertEqual([{release_cursor, maps:get(index, Meta), S3}], SE3).
Loading
Loading