Skip to content

Commit

Permalink
Change directory name generation function for queue indexes and vhosts.
Browse files Browse the repository at this point in the history
It was a mistake to relate on md5(term_to_binary(..)) to generate
vhost and queue directory name, since term_to_binary format can
change.
Migration functions take care of renaming directories.
  • Loading branch information
Daniil Fedotov committed Jun 7, 2017
1 parent 14765d8 commit 0ed7568
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 48 deletions.
111 changes: 65 additions & 46 deletions src/rabbit_queue_index.erl
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
%% ---- Journal details ----

-define(JOURNAL_FILENAME, "journal.jif").
-define(QUEUE_NAME_STUB_FILE, ".queue_name").

-define(PUB_PERSIST_JPREFIX, 2#00).
-define(PUB_TRANS_JPREFIX, 2#01).
Expand Down Expand Up @@ -204,7 +205,9 @@
%% optimisation
pre_publish_cache,
%% optimisation
delivered_cache}).
delivered_cache,
%% queue name resource record
queue_name}).

-record(segment, {
%% segment ID (an integer)
Expand Down Expand Up @@ -295,7 +298,8 @@ erase(Name) ->
erase_index_dir(Dir).

%% used during variable queue purge when there are no pending acks
reset_state(#qistate{ dir = Dir,
reset_state(#qistate{ queue_name = Name,
dir = Dir,
on_sync = OnSyncFun,
on_sync_msg = OnSyncMsgFun,
journal_handle = JournalHdl }) ->
Expand All @@ -304,7 +308,7 @@ reset_state(#qistate{ dir = Dir,
_ -> file_handle_cache:close(JournalHdl)
end,
ok = erase_index_dir(Dir),
blank_state_dir_funs(Dir, OnSyncFun, OnSyncMsgFun).
blank_state_name_dir_funs(Name, Dir, OnSyncFun, OnSyncMsgFun).

init(Name, OnSyncFun, OnSyncMsgFun) ->
State = #qistate { dir = Dir } = blank_state(Name),
Expand Down Expand Up @@ -520,32 +524,6 @@ start(VHost, DurableQueueNames) ->
{OrderedTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}.


read_global_recovery_terms(DurableQueueNames) ->
ok = rabbit_recovery_terms:open_global_table(),

DurableTerms =
lists:foldl(
fun(QName, RecoveryTerms) ->
DirName = queue_name_to_dir_name(QName),
RecoveryInfo = case rabbit_recovery_terms:read_global(DirName) of
{error, _} -> non_clean_shutdown;
{ok, Terms} -> Terms
end,
[RecoveryInfo | RecoveryTerms]
end, [], DurableQueueNames),

ok = rabbit_recovery_terms:close_global_table(),
%% The backing queue interface requires that the queue recovery terms
%% which come back from start/1 are in the same order as DurableQueueNames
OrderedTerms = lists:reverse(DurableTerms),
{OrderedTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}.

cleanup_global_recovery_terms() ->
rabbit_file:recursive_delete([filename:join([queues_base_dir(), "queues"])]),
rabbit_recovery_terms:delete_global_table(),
ok.


stop(VHost) -> rabbit_recovery_terms:stop(VHost).

all_queue_directory_names(VHost) ->
Expand All @@ -567,10 +545,9 @@ erase_index_dir(Dir) ->
end.

blank_state(QueueName) ->
blank_state_dir(queue_dir(QueueName)).

blank_state_dir(Dir) ->
blank_state_dir_funs(Dir,
Dir = queue_dir(QueueName),
blank_state_name_dir_funs(QueueName,
Dir,
fun (_) -> ok end,
fun (_) -> ok end).

Expand All @@ -581,7 +558,20 @@ queue_dir(#resource{ virtual_host = VHost } = QueueName) ->
QueueDir = queue_name_to_dir_name(QueueName),
filename:join([VHostDir, "queues", QueueDir]).

blank_state_dir_funs(Dir, OnSyncFun, OnSyncMsgFun) ->
queue_name_to_dir_name(#resource { kind = queue,
virtual_host = VHost,
name = QName }) ->
<<Num:128>> = erlang:md5(<<"queue", VHost/binary, QName/binary>>),
rabbit_misc:format("~.36B", [Num]).

queue_name_to_dir_name_legacy(Name = #resource { kind = queue }) ->
<<Num:128>> = erlang:md5(term_to_binary(Name)),
rabbit_misc:format("~.36B", [Num]).

queues_base_dir() ->
rabbit_mnesia:dir().

blank_state_name_dir_funs(Name, Dir, OnSyncFun, OnSyncMsgFun) ->
{ok, MaxJournal} =
application:get_env(rabbit, queue_index_max_journal_entries),
#qistate { dir = Dir,
Expand All @@ -594,7 +584,8 @@ blank_state_dir_funs(Dir, OnSyncFun, OnSyncMsgFun) ->
unconfirmed = gb_sets:new(),
unconfirmed_msg = gb_sets:new(),
pre_publish_cache = [],
delivered_cache = [] }.
delivered_cache = [],
queue_name = Name }.

init_clean(RecoveredCounts, State) ->
%% Load the journal. Since this is a clean recovery this (almost)
Expand Down Expand Up @@ -690,13 +681,6 @@ recover_message(false, _, no_del, RelSeq, {Segment, DirtyCount}) ->
add_to_journal(RelSeq, del, Segment)),
DirtyCount + 2}.

queue_name_to_dir_name(Name = #resource { kind = queue }) ->
<<Num:128>> = erlang:md5(term_to_binary(Name)),
rabbit_misc:format("~.36B", [Num]).

queues_base_dir() ->
rabbit_mnesia:dir().

%%----------------------------------------------------------------------------
%% msg store startup delta function
%%----------------------------------------------------------------------------
Expand Down Expand Up @@ -890,9 +874,11 @@ append_journal_to_segment(#segment { journal_entries = JEntries,
end.

get_journal_handle(State = #qistate { journal_handle = undefined,
dir = Dir }) ->
dir = Dir,
queue_name = Name }) ->
Path = filename:join(Dir, ?JOURNAL_FILENAME),
ok = rabbit_file:ensure_dir(Path),
ok = ensure_queue_name_stub_file(Dir, Name),
{ok, Hdl} = file_handle_cache:open_with_absolute_path(
Path, ?WRITE_MODE, [{write_buffer, infinity}]),
{Hdl, State #qistate { journal_handle = Hdl }};
Expand Down Expand Up @@ -1413,7 +1399,8 @@ store_msg_segment(_) ->




%%----------------------------------------------------------------------------
%% Migration functions
%%----------------------------------------------------------------------------

foreach_queue_index(Funs) ->
Expand Down Expand Up @@ -1467,18 +1454,50 @@ drive_transform_fun(Fun, Hdl, Contents) ->

move_to_per_vhost_stores(#resource{} = QueueName) ->
OldQueueDir = filename:join([queues_base_dir(), "queues",
queue_name_to_dir_name(QueueName)]),
queue_name_to_dir_name_legacy(QueueName)]),
NewQueueDir = queue_dir(QueueName),
case rabbit_file:is_dir(OldQueueDir) of
true ->
ok = rabbit_file:ensure_dir(NewQueueDir),
ok = rabbit_file:rename(OldQueueDir, NewQueueDir);
ok = rabbit_file:rename(OldQueueDir, NewQueueDir),
ok = ensure_queue_name_stub_file(NewQueueDir, QueueName);
false ->
rabbit_log:info("Queue index directory not found for queue ~p~n",
[QueueName])
end,
ok.

ensure_queue_name_stub_file(Dir, #resource{virtual_host = VHost, name = QName}) ->
QueueNameFile = filename:join(Dir, ?QUEUE_NAME_STUB_FILE),
file:write_file(QueueNameFile, <<"VHOST: ", VHost/binary, "\n",
"QUEUE: ", QName/binary, "\n">>).

read_global_recovery_terms(DurableQueueNames) ->
ok = rabbit_recovery_terms:open_global_table(),

DurableTerms =
lists:foldl(
fun(QName, RecoveryTerms) ->
DirName = queue_name_to_dir_name_legacy(QName),
RecoveryInfo = case rabbit_recovery_terms:read_global(DirName) of
{error, _} -> non_clean_shutdown;
{ok, Terms} -> Terms
end,
[RecoveryInfo | RecoveryTerms]
end, [], DurableQueueNames),

ok = rabbit_recovery_terms:close_global_table(),
%% The backing queue interface requires that the queue recovery terms
%% which come back from start/1 are in the same order as DurableQueueNames
OrderedTerms = lists:reverse(DurableTerms),
{OrderedTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}.

cleanup_global_recovery_terms() ->
rabbit_file:recursive_delete([filename:join([queues_base_dir(), "queues"])]),
rabbit_recovery_terms:delete_global_table(),
ok.


update_recovery_term(#resource{virtual_host = VHost} = QueueName, Term) ->
Key = queue_name_to_dir_name(QueueName),
rabbit_recovery_terms:store(VHost, Key, Term).
2 changes: 1 addition & 1 deletion src/rabbit_vhost.erl
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ set_limits(VHost = #vhost{}, Limits) ->


dir(Vhost) ->
<<Num:128>> = erlang:md5(term_to_binary(Vhost)),
<<Num:128>> = erlang:md5(Vhost),
rabbit_misc:format("~.36B", [Num]).

msg_store_dir_path(VHost) ->
Expand Down
2 changes: 1 addition & 1 deletion test/backing_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1256,7 +1256,7 @@ make_publish_delivered(IsPersistent, PayloadFun, PropFun, N) ->
PropFun(N, #message_properties{size = 10})}.

queue_name(Config, Name) ->
Name1 = rabbit_ct_helpers:config_to_testcase_name(Config, Name),
Name1 = iolist_to_binary(rabbit_ct_helpers:config_to_testcase_name(Config, Name)),
queue_name(Name1).

queue_name(Name) ->
Expand Down
4 changes: 4 additions & 0 deletions test/clustering_management_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,10 @@ change_cluster_node_type(Config) ->
assert_cluster_status({[Rabbit, Hare], [Hare], [Hare]},
[Rabbit, Hare]),
change_cluster_node_type(Rabbit, disc),

rabbit_control_helper:command(cluster_status, Rabbit, []),
rabbit_control_helper:command(cluster_status, Hare, []),

assert_cluster_status({[Rabbit, Hare], [Rabbit, Hare], [Hare]},
[Rabbit, Hare]),
change_cluster_node_type(Rabbit, ram),
Expand Down

0 comments on commit 0ed7568

Please sign in to comment.