From ae516c7b69942551d168cfc80c0823c1213dcf27 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Wed, 24 Jul 2024 14:30:34 -0400 Subject: [PATCH 1/3] ra_snapshot: Defer validation for older checkpoints This refactors `ra_snapshot:find_checkpoints/1` to cut down on some work when there are many checkpoints. We scan through the checkpoint directories to find the first (latest) valid checkpoint we can use for recovery. Then we can defer using the `ra_snapshot:validate/1` callback (which can be somewhat expensive) for any older checkpoints. We assume that any checkpoints older than the latest valid checkpoint are valid. We expect that invalid checkpoints would be created when a machine terminates hard and unexpectedly and may stop an in-progress write or leave a checkpoint file unsynced. This should only affect some number of the latest checkpoints though. Once we've found a checkpoint file that is valid, checkpoints older than that should be fully written and synchronized too. We also bail out of the search when we find a checkpoint that has a lower index than the current snapshot index. Those checkpoints cannot be promoted and should be deleted. We scan through the checkpoints from most recent to least recent, so when we find a checkpoint with an older index than the snapshot, we delete that checkpoint and any older checkpoints. --- src/ra_snapshot.erl | 110 +++++++++++++++++++++++++++++++------------- 1 file changed, 79 insertions(+), 31 deletions(-) diff --git a/src/ra_snapshot.erl b/src/ra_snapshot.erl index 29c7f42b..c123cfd8 100644 --- a/src/ra_snapshot.erl +++ b/src/ra_snapshot.erl @@ -220,9 +220,7 @@ pick_first_valid(UId, Mod, Dir, [S | Rem]) -> pick_first_valid(UId, Mod, Dir, Rem) end. -find_checkpoints(#?MODULE{uid = UId, - module = Module, - current = Current, +find_checkpoints(#?MODULE{current = Current, checkpoint_directory = CheckpointDir} = State) -> case ra_lib:is_dir(CheckpointDir) of false -> @@ -235,37 +233,87 @@ find_checkpoints(#?MODULE{uid = UId, I end, {ok, CPFiles0} = prim_file:list_dir(CheckpointDir), + %% Reverse-sort the files so that the most recent checkpoints + %% come first. CPFiles = lists:reverse(lists:sort(CPFiles0)), - Checkpoints = - lists:filtermap( - fun(File) -> - CP = filename:join(CheckpointDir, File), - case Module:validate(CP) of - ok -> - {ok, #{index := Idx, term := Term}} = - Module:read_meta(CP), - case Idx > CurrentIdx of - true -> - {true, {Idx, Term}}; - false -> - ?INFO("ra_snapshot: ~ts: removing " - "checkpoint ~s as was older than the " - "current snapshot.", - [UId, CP]), - delete(CheckpointDir, {Idx, Term}), - false - end; - Err -> - ?INFO("ra_snapshot: ~ts: removing checkpoint ~s as " - "did not validate. Err: ~w", - [UId, CP, Err]), - ra_lib:recursive_delete(CP), - false - end - end, CPFiles), - State#?MODULE{checkpoints = Checkpoints} + find_checkpoints(CPFiles, State, CurrentIdx, []) end. +find_checkpoints([], State, _CurrentIdx, Checkpoints) -> + %% Reverse so that the most recent checkpoints come first. + State#?MODULE{checkpoints = lists:reverse(Checkpoints)}; +find_checkpoints( + [File | Files], + #?MODULE{uid = UId, + module = Module, + checkpoint_directory = CheckpointDir} = State, + CurrentIdx, []) -> + %% When we haven't yet found a valid checkpoint (`Checkpoints =:= []`), + %% fully validate the file with the `ra_snapshot:validate/1` callback to + %% ensure that we can recover from the latest checkpoint. + CP = filename:join(CheckpointDir, File), + case Module:validate(CP) of + ok -> + {ok, #{index := Idx, term := Term}} = Module:read_meta(CP), + case Idx > CurrentIdx of + true -> + find_checkpoints(Files, State, CurrentIdx, [{Idx, Term}]); + false -> + %% If the first valid checkpoint is older than the snapshot + %% index then all checkpoints in `Files` are older as well. + %% Delete all checkpoints and bail. + delete_stale_checkpoints( + UId, CheckpointDir, [File | Files]), + State + end; + Err -> + ?INFO("ra_snapshot: ~ts: removing checkpoint ~s as it did not " + "validate. Err: ~w", + [UId, CP, Err]), + _ = ra_lib:recursive_delete(CP), + find_checkpoints(Files, State, CurrentIdx, []) + end; +find_checkpoints( + [File | Files], + #?MODULE{uid = UId, + module = Module, + checkpoint_directory = CheckpointDir} = State, + CurrentIdx, Checkpoints) -> + %% If a valid checkpoint has already been found, delay validation for the + %% older remaining checkpoints until we attempt to promote them. This + %% reduces I/O usage on startup. + CP = filename:join(CheckpointDir, File), + case Module:read_meta(CP) of + {ok, #{index := Idx, term := Term}} -> + case Idx > CurrentIdx of + true -> + find_checkpoints( + Files, State, CurrentIdx, [{Idx, Term} | Checkpoints]); + false -> + %% If this checkpoint is older than the current snapshot + %% then all later `Files` will be as well. Delete them and + %% finish searching. + delete_stale_checkpoints( + UId, CheckpointDir, [File | Files]), + find_checkpoints([], State, CurrentIdx, Checkpoints) + end; + Err -> + ?INFO("ra_snapshot: ~ts: removing checkpoint ~s as metadata could " + "not be read. Err: ~w", + [UId, CP, Err]), + _ = ra_lib:recursive_delete(CP), + find_checkpoints(Files, State, CurrentIdx, Checkpoints) + end. + +delete_stale_checkpoints(UId, CheckpointDir, Files) -> + [begin + CP = filename:join(CheckpointDir, File), + ?INFO("ra_snapshot: ~ts: removing checkpoint ~s as it was older than " + "the current snapshot.", [UId, CP]), + _ = ra_lib:recursive_delete(CP) + end || File <- Files], + ok. + -spec init_ets() -> ok. init_ets() -> TableFlags = [set, From 48ecb89a39da216fd99f8cb1afb05920e8bd7040 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Thu, 25 Jul 2024 11:23:12 -0400 Subject: [PATCH 2/3] ra_checkpoint_SUITE: Show recovery from older valid checkpoint --- test/ra_checkpoint_SUITE.erl | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/test/ra_checkpoint_SUITE.erl b/test/ra_checkpoint_SUITE.erl index bfcdfe28..8640d7f5 100644 --- a/test/ra_checkpoint_SUITE.erl +++ b/test/ra_checkpoint_SUITE.erl @@ -261,28 +261,38 @@ init_recover_corrupt(Config) -> State0 = init_state(Config), %% Take a checkpoint. - Meta = meta(55, 2, [node()]), - MacRef = ?FUNCTION_NAME, - {State1, _} = ra_snapshot:begin_snapshot(Meta, MacRef, checkpoint, State0), + Meta1 = meta(55, 2, [node()]), + {State1, _} = ra_snapshot:begin_snapshot(Meta1, ?FUNCTION_NAME, checkpoint, State0), + State2 = receive + {ra_log_event, {snapshot_written, {55, 2} = IdxTerm1, checkpoint}} -> + ra_snapshot:complete_snapshot(IdxTerm1, checkpoint, State1) + after 1000 -> + error(snapshot_event_timeout) + end, + + %% Take another checkpoint. + Meta2 = meta(165, 2, [node()]), + {State3, _} = ra_snapshot:begin_snapshot(Meta2, ?FUNCTION_NAME, checkpoint, State2), receive - {ra_log_event, {snapshot_written, {55, 2} = IdxTerm, checkpoint}} -> - _ = ra_snapshot:complete_snapshot(IdxTerm, checkpoint, State1), + {ra_log_event, {snapshot_written, {165, 2} = IdxTerm2, checkpoint}} -> + _ = ra_snapshot:complete_snapshot(IdxTerm2, checkpoint, State3), ok after 1000 -> error(snapshot_event_timeout) end, - %% Delete the file but leave the directory intact. + %% Corrupt the latest checkpoint by deleting the snapshot.dat file but + %% leaving the checkpoint directory intact. CorruptDir = filename:join(?config(checkpoint_dir, Config), - ra_lib:zpad_hex(2) ++ "_" ++ ra_lib:zpad_hex(55)), + ra_lib:zpad_hex(2) ++ "_" ++ ra_lib:zpad_hex(165)), ok = file:delete(filename:join(CorruptDir, "snapshot.dat")), Recover = init_state(Config), %% The checkpoint isn't recovered and the directory is cleaned up. undefined = ra_snapshot:pending(Recover), undefined = ra_snapshot:current(Recover), - undefined = ra_snapshot:latest_checkpoint(Recover), - {error, no_current_snapshot} = ra_snapshot:recover(Recover), + {55, 2} = ra_snapshot:latest_checkpoint(Recover), + {ok, Meta1, ?FUNCTION_NAME} = ra_snapshot:recover(Recover), false = filelib:is_dir(CorruptDir), ok. From b7fe2da227aae1b2824f1e38673964f2f51ea77f Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Wed, 14 Aug 2024 10:45:38 +0100 Subject: [PATCH 3/3] Formatting and comment fix --- src/ra_snapshot.erl | 28 +++++++++++++--------------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/src/ra_snapshot.erl b/src/ra_snapshot.erl index c123cfd8..69502c4d 100644 --- a/src/ra_snapshot.erl +++ b/src/ra_snapshot.erl @@ -242,12 +242,11 @@ find_checkpoints(#?MODULE{current = Current, find_checkpoints([], State, _CurrentIdx, Checkpoints) -> %% Reverse so that the most recent checkpoints come first. State#?MODULE{checkpoints = lists:reverse(Checkpoints)}; -find_checkpoints( - [File | Files], - #?MODULE{uid = UId, - module = Module, - checkpoint_directory = CheckpointDir} = State, - CurrentIdx, []) -> +find_checkpoints([File | Files], + #?MODULE{uid = UId, + module = Module, + checkpoint_directory = CheckpointDir} = State, + CurrentIdx, []) -> %% When we haven't yet found a valid checkpoint (`Checkpoints =:= []`), %% fully validate the file with the `ra_snapshot:validate/1` callback to %% ensure that we can recover from the latest checkpoint. @@ -273,15 +272,14 @@ find_checkpoints( _ = ra_lib:recursive_delete(CP), find_checkpoints(Files, State, CurrentIdx, []) end; -find_checkpoints( - [File | Files], - #?MODULE{uid = UId, - module = Module, - checkpoint_directory = CheckpointDir} = State, - CurrentIdx, Checkpoints) -> - %% If a valid checkpoint has already been found, delay validation for the - %% older remaining checkpoints until we attempt to promote them. This - %% reduces I/O usage on startup. +find_checkpoints([File | Files], + #?MODULE{uid = UId, + module = Module, + checkpoint_directory = CheckpointDir} = State, + CurrentIdx, Checkpoints) -> + %% If a valid checkpoint has already been found it is assumed all older + %% checkpoints are also valid. Scanning all can introduce a lot of + %% additional I/O during recovery. CP = filename:join(CheckpointDir, File), case Module:read_meta(CP) of {ok, #{index := Idx, term := Term}} ->