Skip to content

Commit

Permalink
Adjust ETS storage example to use unnamed tables
Browse files Browse the repository at this point in the history
Summary:
Using a named table in the example ETS-based storage provider allows for
convenient access to the backing ETS table but causes issues when
creating and opening snapshots. Adjust the implementation to properly
track the storage position and use unnamed tables to avoid issues when
opening snapshots due to ETS's behavior of restoring the table's name
using using `file2tab`.

Differential Revision: D62262547

fbshipit-source-id: 51c4cc0cfed37502c486abe5d3ad85dffb736d07
  • Loading branch information
hsun324 authored and facebook-github-bot committed Sep 6, 2024
1 parent 63bc85d commit 7cd4b0f
Showing 1 changed file with 38 additions and 32 deletions.
70 changes: 38 additions & 32 deletions src/wa_raft_storage_ets.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,82 +25,88 @@

-include("wa_raft.hrl").

%% Filename used to store ETS table in a snapshot
%% Filename used for the actual ETS table file in a snapshot
-define(SNAPSHOT_FILENAME, "data").

%% Tag used in keys for metadata stored on the behalf of RAFT
-define(METADATA_TAG, '$metadata').
%% Tag used for recording the current storage position
-define(POSITION_TAG, '$position').

-record(state, {
name :: atom(),
table :: wa_raft:table(),
partition :: wa_raft:partition(),
position = #raft_log_pos{} :: wa_raft_log:log_pos()
storage :: ets:table()
}).

-spec storage_open(atom(), #raft_identifier{}, file:filename()) -> #state{}.
storage_open(Name, #raft_identifier{table = Table, partition = Partition}, _RootDir) ->
Name = ets:new(Name, [set, named_table, public, {read_concurrency, true}, {write_concurrency, true}]),
#state{name = Name, table = Table, partition = Partition}.
Storage = ets:new(Name, [set, public, {read_concurrency, true}, {write_concurrency, true}]),
#state{name = Name, table = Table, partition = Partition, storage = Storage}.

-spec storage_close(#state{}) -> ok.
storage_close(#state{name = Name}) ->
true = ets:delete(Name),
storage_close(#state{storage = Storage}) ->
true = ets:delete(Storage),
ok.

-spec storage_position(#state{}) -> wa_raft_log:log_pos().
storage_position(#state{position = Position}) ->
Position.
storage_position(#state{storage = Storage}) ->
ets:lookup_element(Storage, ?POSITION_TAG, 2, #raft_log_pos{}).

-spec storage_apply(Command :: wa_raft_acceptor:command(), Position :: wa_raft_log:log_pos(), Storage :: #state{}) -> {ok, #state{}}.
storage_apply(noop, Position, #state{} = State) ->
{ok, State#state{position = Position}};
storage_apply({write, _Table, Key, Value}, Position, #state{name = Name} = State) ->
true = ets:insert(Name, {Key, Value}),
{ok, State#state{position = Position}};
storage_apply({delete, _Table, Key}, Position, #state{name = Name} = State) ->
true = ets:delete(Name, Key),
{ok, State#state{position = Position}}.
storage_apply(noop, Position, #state{storage = Storage} = State) ->
true = ets:insert(Storage, {?POSITION_TAG, Position}),
{ok, State};
storage_apply({write, _Table, Key, Value}, Position, #state{storage = Storage} = State) ->
true = ets:insert(Storage, [{Key, Value}, {?POSITION_TAG, Position}]),
{ok, State};
storage_apply({delete, _Table, Key}, Position, #state{storage = Storage} = State) ->
true = ets:delete(Storage, Key),
true = ets:insert(Storage, {?POSITION_TAG, Position}),
{ok, State}.

-spec storage_write_metadata(#state{}, wa_raft_storage:metadata(), wa_raft_log:log_pos(), term()) -> ok.
storage_write_metadata(#state{name = Name}, Key, Version, Value) ->
true = ets:insert(Name, {{?METADATA_TAG, Key}, {Version, Value}}),
storage_write_metadata(#state{storage = Storage}, Key, Version, Value) ->
true = ets:insert(Storage, [{{?METADATA_TAG, Key}, {Version, Value}}, {?POSITION_TAG, Version}]),
ok.

-spec storage_read(Command :: wa_raft_acceptor:command(), Position :: wa_raft_log:log_pos(), State :: #state{}) -> ok | {ok, Value :: dynamic()} | not_found.
storage_read(noop, _Position, #state{}) ->
ok;
storage_read({read, _Table, Key}, _Position, #state{name = Name}) ->
case ets:lookup(Name, Key) of
storage_read({read, _Table, Key}, _Position, #state{storage = Storage}) ->
case ets:lookup(Storage, Key) of
[{_, Value}] -> {ok, Value};
[] -> not_found
end.

-spec storage_read_metadata(#state{}, wa_raft_storage:metadata()) -> {ok, wa_raft_log:log_pos(), term()} | undefined.
storage_read_metadata(#state{name = Name}, Key) ->
case ets:lookup(Name, {?METADATA_TAG, Key}) of
storage_read_metadata(#state{storage = Storage}, Key) ->
case ets:lookup(Storage, {?METADATA_TAG, Key}) of
[{_, {Version, Value}}] -> {ok, Version, Value};
[] -> undefined
end.

-spec storage_create_snapshot(file:filename(), #state{}) -> ok | wa_raft_storage:error().
storage_create_snapshot(SnapshotPath, #state{name = Name}) ->
storage_create_snapshot(SnapshotPath, #state{storage = Storage}) ->
case filelib:ensure_path(SnapshotPath) of
ok -> ets:tab2file(Name, filename:join(SnapshotPath, ?SNAPSHOT_FILENAME));
ok -> ets:tab2file(Storage, filename:join(SnapshotPath, ?SNAPSHOT_FILENAME));
{error, Reason} -> {error, Reason}
end.

-spec storage_open_snapshot(file:filename(), wa_raft_log:log_pos(), #state{}) -> {ok, #state{}} | wa_raft_storage:error().
storage_open_snapshot(SnapshotPath, LastApplied, #state{name = Name} = State) ->
Temp = list_to_atom(atom_to_list(Name) ++ "_temp"),
Temp = ets:rename(Name, Temp),
storage_open_snapshot(SnapshotPath, SnapshotPosition, #state{storage = Storage} = State) ->
SnapshotData = filename:join(SnapshotPath, ?SNAPSHOT_FILENAME),
case ets:file2tab(SnapshotData) of
{ok, Created} ->
Created =/= Name andalso (Name = ets:rename(Created, Name)),
catch ets:delete(Temp),
{ok, State#state{position = LastApplied}};
{ok, NewStorage} ->
case ets:lookup_element(NewStorage, ?POSITION_TAG, 2, #raft_log_pos{}) of
SnapshotPosition ->
catch ets:delete(Storage),
{ok, State#state{storage = NewStorage}};
_IncorrectPosition ->
catch ets:delete(NewStorage),
{error, bad_position}
end;
{error, Reason} ->
Name = ets:rename(Temp, Name),
{error, Reason}
end.

0 comments on commit 7cd4b0f

Please sign in to comment.