Skip to content

Commit

Permalink
Use selective receive to allow for vnode to be blocked
Browse files Browse the repository at this point in the history
Block and unblock a vnode when triggering a repair.  Otherwise their is a potential race condition:

- aae_controller is triggered into repair mode, queueing all updates to be re-applied after rebuild complete
- update is applied to leveled backend by vnode
- snapshot is taken for rebuild (including update)
- message is cast to aae_controller with update ... which will be queued and applied twice
- update is queue
  • Loading branch information
martinsumner committed Mar 27, 2024
1 parent 6e39282 commit 43fab28
Showing 1 changed file with 37 additions and 9 deletions.
46 changes: 37 additions & 9 deletions src/riak_kv_vnode.erl
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,8 @@
-define(REAPER_BATCH_SIZE, 1024).
-define(ERASER_BATCH_SIZE, 1024).

-define(INIT_REBUILD_BLOCKTIME, 1000).

%% Erlang's if Bool -> thing; true -> thang end. syntax hurts my
%% brain. It scans as if true -> thing; true -> thang end. So, here is
%% a macro, ?ELSE to use in if statements. You're welcome.
Expand Down Expand Up @@ -438,21 +440,30 @@ queue_tictactreerebuild(AAECntrl, Partition, OnlyIfBroken, State) ->
fun() ->
?LOG_INFO("Starting tree rebuild for partition=~w", [Partition]),
SW = os:timestamp(),
case when_loading_complete(AAECntrl,
Preflists,
fun preflistfun/2,
OnlyIfBroken) of
BlockRequest = self(),
BlockTimeMS = ?INIT_REBUILD_BLOCKTIME,
{blocked, VnodePid} =
riak_core_vnode_master:sync_command(
{Partition, node()},
{block_vnode, BlockRequest, BlockTimeMS},
riak_kv_vnode_master,
infinity),
case when_loading_complete(
AAECntrl, Preflists, fun preflistfun/2, OnlyIfBroken) of
{ok, StoreFold, FinishFun} ->
VnodePid ! {release_vnode, BlockRequest},
Output = StoreFold(),
FinishFun(Output),
Duration =
timer:now_diff(os:timestamp(), SW) div (1000 * 1000),
?LOG_INFO("Tree rebuild complete for partition=~w" ++
" in duration=~w seconds",
[Partition, Duration]);
?LOG_INFO(
"Tree rebuild complete for partition=~w"
" in duration=~w seconds",
[Partition, Duration]);
skipped ->
?LOG_INFO("Tree rebuild skipped for partition=~w",
[Partition])
VnodePid ! {release_vnode, BlockRequest},
?LOG_INFO(
"Tree rebuild skipped for partition=~w", [Partition])
end,
ok
end,
Expand Down Expand Up @@ -1484,6 +1495,23 @@ handle_command({reset_hashtree_tokens, MinToken, MaxToken}, _Sender, State) ->
end,
{reply, ok, State};

handle_command({block_vnode, BlockRequest, BlockTimeMS}, Sender, State) ->
riak_core_vnode:reply(Sender, {blocked, self()}),
receive
{release_vnode, BlockRequest} ->
?LOG_INFO(
"Vnode block released for ~w request ~w",
[Sender, BlockRequest]),
{noreply, State}
after
BlockTimeMS ->
?LOG_WARNING(
"Vnode block request timed out after ~w for ~w request ~w",
[BlockTimeMS, Sender, BlockRequest]
),
{noreply, State}
end;

handle_command(Req, Sender, State) ->
handle_request(riak_kv_requests:request_type(Req), Req, Sender, State).

Expand Down

0 comments on commit 43fab28

Please sign in to comment.