Skip to content

Commit

Permalink
khepri_machine: Use an effect to initialize new projections
Browse files Browse the repository at this point in the history
Previously the machine would initialize projections in-place within
`apply/3`. That would cause projections to be restored ahead of time
when the machine was recovering if it applied a `#register_projection{}`
command.

We can use an aux effect instead for the creation and initial filling
of the projection's ETS table. Aux effects are skipped during recovery.
Once the machine is fully recovered and the server enters the
`recovered` state, we emit a `restore_projections` aux effect that
properly (re)creates and fills the projections. (This is already done by
`khepri_machine`.)
  • Loading branch information
the-mikedavis committed Mar 19, 2024
1 parent 216c290 commit b509150
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 19 deletions.
45 changes: 26 additions & 19 deletions src/khepri_machine.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1122,6 +1122,13 @@ handle_aux(
Projection <- Projections]
end),
{no_reply, AuxState, LogState};
handle_aux(
_RaState, cast,
#restore_projection{projection = Projection, pattern = PathPattern},
AuxState, LogState, State) ->
Tree = get_tree(State),
ok = restore_projection(Projection, Tree, PathPattern),
{no_reply, AuxState, LogState};
handle_aux(_RaState, _Type, _Command, AuxState, LogState, _MachineState) ->
{no_reply, AuxState, LogState}.

Expand Down Expand Up @@ -1208,26 +1215,26 @@ apply(
Meta,
#register_projection{pattern = PathPattern, projection = Projection},
State) ->
Tree = get_tree(State),
ProjectionName = Projection#khepri_projection.name,
ProjectionTree = get_projections(State),
Reply = khepri_projection:init(Projection),
State1 = case Reply of
ok ->
restore_projection(Projection, Tree, PathPattern),
ProjectionTree1 = khepri_pattern_tree:update(
ProjectionTree,
PathPattern,
fun (?NO_PAYLOAD) ->
[Projection];
(Projections) ->
[Projection | Projections]
end),
erase(compiled_projection_tree),
set_projections(State, ProjectionTree1);
_ ->
State
end,
Ret = {State1, Reply},
Ret = case has_projection(ProjectionTree, ProjectionName) of
true ->
{State, {error, exists}, []};
false ->
ProjectionTree1 = khepri_pattern_tree:update(
ProjectionTree,
PathPattern,
fun (?NO_PAYLOAD) ->
[Projection];
(Projections) ->
[Projection | Projections]
end),
erase(compiled_projection_tree),
AuxEffect = #restore_projection{projection = Projection,
pattern = PathPattern},
Effects = [{aux, AuxEffect}],
{set_projections(State, ProjectionTree1), ok, Effects}
end,
bump_applied_command_count(Ret, Meta);
apply(
Meta,
Expand Down
3 changes: 3 additions & 0 deletions src/khepri_machine.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,6 @@
old_props :: khepri:node_props(),
new_props :: khepri:node_props(),
projection :: khepri_projection:projection()}).

-record(restore_projection, {pattern :: khepri_path:native_pattern(),
projection :: khepri_projection:projection()}).

0 comments on commit b509150

Please sign in to comment.