Skip to content

Commit

Permalink
wip: migrating to vhost base messge store
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniil Fedotov committed Oct 20, 2016
1 parent b3fc71a commit 492e23b
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 46 deletions.
4 changes: 2 additions & 2 deletions src/rabbit_msg_store.erl
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->

Dir = filename:join(BaseDir, atom_to_list(Server)),

{ok, IndexModule} = application:get_env(msg_store_index_module),
{ok, IndexModule} = application:get_env(rabbit,msg_store_index_module),
rabbit_log:info("~w: using ~p to provide index~n", [Server, IndexModule]),

AttemptFileSummaryRecovery =
Expand Down Expand Up @@ -758,7 +758,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
DyingIndex = ets:new(rabbit_msg_store_dying_client_index,
[set, public, {keypos, #dying_client.client_ref}]),

{ok, FileSizeLimit} = application:get_env(msg_store_file_size_limit),
{ok, FileSizeLimit} = application:get_env(rabbit,msg_store_file_size_limit),

{ok, GCPid} = rabbit_msg_store_gc:start_link(
#gc_state { dir = Dir,
Expand Down
16 changes: 8 additions & 8 deletions src/rabbit_queue_index.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]).

-export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0, store_msg/0]).
-export([scan_queue_segments/3]).

-define(CLEAN_FILENAME, "clean.dot").

Expand Down Expand Up @@ -660,20 +661,19 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) ->
end.

queue_index_walker_reader(QueueName, Gatherer) ->
State = blank_state(QueueName),
ok = scan_segments(
ok = scan_queue_segments(
fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, ok)
when is_binary(MsgId) ->
gatherer:sync_in(Gatherer, {MsgId, 1});
(_SeqId, _MsgId, _MsgProps, _IsPersistent, _IsDelivered,
_IsAcked, Acc) ->
Acc
end, ok, State),
end, ok, QueueName),
ok = gatherer:finish(Gatherer).

scan_segments(Fun, Acc, State) ->
State1 = #qistate { segments = Segments, dir = Dir } =
recover_journal(State),
scan_queue_segments(Fun, Acc, QueueName) ->
State = #qistate { segments = Segments, dir = Dir } =
recover_journal(blank_state(QueueName)),
Result = lists:foldr(
fun (Seg, AccN) ->
segment_entries_foldr(
Expand All @@ -682,8 +682,8 @@ scan_segments(Fun, Acc, State) ->
Fun(reconstruct_seq_id(Seg, RelSeq), MsgOrId, MsgProps,
IsPersistent, IsDelivered, IsAcked, AccM)
end, AccN, segment_find_or_new(Seg, Dir, Segments))
end, Acc, all_segment_nums(State1)),
{_SegmentCounts, _State} = terminate(State1),
end, Acc, all_segment_nums(State)),
{_SegmentCounts, _State} = terminate(State),
Result.

%%----------------------------------------------------------------------------
Expand Down
96 changes: 60 additions & 36 deletions src/rabbit_variable_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
%% exported for testing only
-export([start_msg_store/2, stop_msg_store/0, init/6]).

-export([move_messages_to_vhost_store/0]).
-include_lib("stdlib/include/qlc.hrl").

%%----------------------------------------------------------------------------
%% Messages, and their position in the queue, can be in memory or on
%% disk, or both. Persistent messages will have both message and
Expand Down Expand Up @@ -334,8 +337,11 @@
}).

-define(HEADER_GUESS_SIZE, 100). %% see determine_persist_to/2
-define(PERSISTENT_MSG_STORE_SUP, msg_store_persistent_vhost).
-define(TRANSIENT_MSG_STORE_SUP, msg_store_transient_vhost).
-define(PERSISTENT_MSG_STORE, msg_store_persistent).
-define(TRANSIENT_MSG_STORE, msg_store_transient).

-define(QUEUE, lqueue).

-include("rabbit.hrl").
Expand All @@ -344,7 +350,8 @@
%%----------------------------------------------------------------------------

-rabbit_upgrade({multiple_routing_keys, local, []}).
-rabbit_upgrade({move_messages_to_vhost_store, local, []}).
% -rabbit_upgrade({move_messages_to_vhost_store, local, []}). requires mnesia, requires rabbit_sup, requires worker_pool, requires fhc
-compile(export_all).

-type seq_id() :: non_neg_integer().

Expand Down Expand Up @@ -452,6 +459,8 @@
%% Public API
%%----------------------------------------------------------------------------



start(DurableQueues) ->
{AllTerms, StartFunState} = rabbit_queue_index:start(DurableQueues),
start_msg_store(
Expand All @@ -470,23 +479,23 @@ stop() ->

start_msg_store(Refs, StartFunState) ->
VHosts = rabbit_vhost:list(),
ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store_vhost_sup,
[?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(),
ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE_SUP, rabbit_msg_store_vhost_sup,
[?TRANSIENT_MSG_STORE_SUP, rabbit_mnesia:dir(),
undefined, {fun (ok) -> finished end, ok}]),
ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store_vhost_sup,
[?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(),
ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE_SUP, rabbit_msg_store_vhost_sup,
[?PERSISTENT_MSG_STORE_SUP, rabbit_mnesia:dir(),
Refs, StartFunState]),
lists:foreach(
fun(VHost) ->
rabbit_msg_store_vhost_sup:add_vhost(?TRANSIENT_MSG_STORE, VHost),
rabbit_msg_store_vhost_sup:add_vhost(?PERSISTENT_MSG_STORE, VHost)
rabbit_msg_store_vhost_sup:add_vhost(?TRANSIENT_MSG_STORE_SUP, VHost),
rabbit_msg_store_vhost_sup:add_vhost(?PERSISTENT_MSG_STORE_SUP, VHost)
end,
VHosts),
ok.

stop_msg_store() ->
ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE),
ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE).
ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE_SUP),
ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE_SUP).

init(Queue, Recover, Callback) ->
init(
Expand All @@ -504,11 +513,11 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, new,
VHost = QueueName#resource.virtual_host,
init(IsDurable, IndexState, 0, 0, [],
case IsDurable of
true -> msg_store_client_init(?PERSISTENT_MSG_STORE,
true -> msg_store_client_init(?PERSISTENT_MSG_STORE_SUP,
MsgOnDiskFun, AsyncCallback, VHost);
false -> undefined
end,
msg_store_client_init(?TRANSIENT_MSG_STORE, undefined,
msg_store_client_init(?TRANSIENT_MSG_STORE_SUP, undefined,
AsyncCallback, VHost));

%% We can be recovering a transient queue if it crashed
Expand All @@ -518,7 +527,7 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, Terms,
VHost = QueueName#resource.virtual_host,
{PersistentClient, ContainsCheckFun} =
case IsDurable of
true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef,
true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE_SUP, PRef,
MsgOnDiskFun, AsyncCallback,
VHost),
{C, fun (MsgId) when is_binary(MsgId) ->
Expand All @@ -528,14 +537,14 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, Terms,
end};
false -> {undefined, fun(_MsgId) -> false end}
end,
TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE,
TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE_SUP,
undefined, AsyncCallback,
VHost),
{DeltaCount, DeltaBytes, IndexState} =
rabbit_queue_index:recover(
QueueName, RecoveryTerms,
rabbit_msg_store_vhost_sup:successfully_recovered_state(
?PERSISTENT_MSG_STORE, VHost),
?PERSISTENT_MSG_STORE_SUP, VHost),
ContainsCheckFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun),
init(IsDurable, IndexState, DeltaCount, DeltaBytes, RecoveryTerms,
PersistentClient, TransientClient).
Expand Down Expand Up @@ -1208,7 +1217,7 @@ msg_store_client_init(MsgStore, MsgOnDiskFun, Callback, VHost) ->
Callback, VHost).

msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback, VHost) ->
CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE),
CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE_SUP),
rabbit_msg_store_vhost_sup:client_init(MsgStore, Ref, MsgOnDiskFun,
fun () ->
Callback(?MODULE, CloseFDsFun)
Expand Down Expand Up @@ -2666,23 +2675,26 @@ multiple_routing_keys() ->

%% Assumes message store is not running
transform_storage(TransformFun) ->
transform_store(?PERSISTENT_MSG_STORE, TransformFun),
transform_store(?TRANSIENT_MSG_STORE, TransformFun).
transform_store(?PERSISTENT_MSG_STORE_SUP, TransformFun),
transform_store(?TRANSIENT_MSG_STORE_SUP, TransformFun).

transform_store(Store, TransformFun) ->
rabbit_msg_store:force_recovery(rabbit_mnesia:dir(), Store),
rabbit_msg_store:transform_dir(rabbit_mnesia:dir(), Store, TransformFun).

move_messages_to_vhost_store() ->
Queues = list_persistent_queues(),
Queues = rabbit_variable_queue:list_persistent_queues(),
% Maybe recover old store.
{RecoveryTerms, StartFunState} = start_recovery_terms(Queues),
OldStore = run_old_persistent_store(RecoveryTerms, StartFunState),
NewStoreSup = start_new_store_sup(),
Migrations = spawn_for_each(fun(Queue) ->
{RecoveryTerms, StartFunState} = rabbit_variable_queue:start_recovery_terms(Queues),
OldStore = rabbit_variable_queue:run_old_persistent_store(RecoveryTerms, StartFunState),
NewStoreSup = rabbit_variable_queue:start_new_store_sup(),
lists:map(fun(Queue) ->
migrate_queue(Queue, OldStore, NewStoreSup)
end, Queues),
wait(Migrations),
% Migrations = spawn_for_each(fun(Queue) ->
% migrate_queue(Queue, OldStore, NewStoreSup)
% end, Queues),
% wait(Migrations),
delete_old_store(OldStore).

migrate_queue(Queue, OldStore, NewStoreSup) ->
Expand All @@ -2697,8 +2709,19 @@ migrate_queue(Queue, OldStore, NewStoreSup) ->
_ -> OldC
end
end,
OldStoreClient,
Queue).

walk_queue_index(Fun, Client, #amqqueue{name = QueueName}) ->
% WARNING: State is being recovered and terminated. This can cause side effects!
rabbit_queue_index:scan_queue_segments(
fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, _IsAcked, ClientState)
when is_binary(MsgId) ->
Fun(MsgId, ClientState);
(_SeqId, _MsgId, _MsgProps, _IsPersistent, _IsDelivered, _IsAcked, ClientState) ->
ClientState
end, Client, QueueName).

spawn_for_each(Fun, List) ->
Ref = erlang:make_ref(),
Self = self(),
Expand Down Expand Up @@ -2752,7 +2775,8 @@ list_persistent_queues() ->
end).

start_recovery_terms(Queues) ->
{AllTerms, StartFunState} = rabbit_queue_index:start(Queues),
QueueNames = [Name || #amqqueue{name = Name} <- Queues],
{AllTerms, StartFunState} = rabbit_queue_index:start(QueueNames),
Refs = [Ref || Terms <- AllTerms,
Terms /= non_clean_shutdown,
begin
Expand All @@ -2762,30 +2786,30 @@ start_recovery_terms(Queues) ->
{Refs, StartFunState}.

run_old_persistent_store(Refs, StartFunState) ->
OldStoreName = old_persistent_msg_store,
OldStoreName = ?PERSISTENT_MSG_STORE,
ok = rabbit_sup:start_child(OldStoreName, rabbit_msg_store,
[OldStoreName, rabbit_mnesia:dir(),
Refs, StartFunState]),
OldStoreName.

run_persistent_store(Vhost) ->


?PERSISTENT_MSG_STORE.

start_new_store_sup() ->
% Start persistent store sup without recovery.
ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store_vhost_sup,
[?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(),
ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE_SUP, rabbit_msg_store_vhost_sup,
[?PERSISTENT_MSG_STORE_SUP, rabbit_mnesia:dir(),
undefined, {fun (ok) -> finished end, ok}]),
?PERSISTENT_MSG_STORE.
?PERSISTENT_MSG_STORE_SUP.

delete_old_store(OldStore) ->
gen_server:stop(OldStore),
rabbit_file:recursive_delete(
filename:join([rabbit_mnesia:dir(), ?PERSISTENT_MSG_STORE])).

rabbit_file:recursive_delete([filename:join([rabbit_mnesia:dir(), ?PERSISTENT_MSG_STORE])]).



setup() ->
application:load(rabbit),
mnesia:start(),
rabbit_sup:start_link(),
rabbit:start_fhc(),
rabbit_sup:start_restartable_child(rabbit_guid),
rabbit_sup:start_supervisor_child(worker_pool_sup).

0 comments on commit 492e23b

Please sign in to comment.