Skip to content

Commit

Permalink
QQ first draft hi/lo priority queue
Browse files Browse the repository at this point in the history
  • Loading branch information
kjnilsson committed May 16, 2024
1 parent c681321 commit 93dfb0f
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 88 deletions.
61 changes: 28 additions & 33 deletions deps/rabbit/src/rabbit_fifo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
-dialyzer(no_improper_lists).

-include("rabbit_fifo.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").

-define(STATE, ?MODULE).

Expand Down Expand Up @@ -739,8 +738,9 @@ apply(_Meta, Cmd, State) ->
{State, ok, []}.

convert_v3_to_v4(#{system_time := Ts},
#rabbit_fifo{messages = Messages0,
consumers = Consumers0} = StateV3) ->
StateV3) ->
Messages0 = rabbit_fifo_v3:get_field(messages, StateV3),
Consumers0 = rabbit_fifo_v3:get_field(consumers, StateV3),
Messages = rabbit_fifo_q:from_lqueue(Messages0),
Consumers = maps:map(
fun (_CKey, #consumer{checked_out = Ch0} = C) ->
Expand All @@ -750,8 +750,23 @@ convert_v3_to_v4(#{system_time := Ts},
end, Ch0),
C#consumer{checked_out = Ch}
end, Consumers0),
StateV3#?MODULE{messages = Messages,
consumers = Consumers}.
#?MODULE{cfg = rabbit_fifo_v3:get_field(cfg, StateV3),
messages = Messages,
messages_total = rabbit_fifo_v3:get_field(messages_total, StateV3),
returns = rabbit_fifo_v3:get_field(returns, StateV3),
enqueue_count = rabbit_fifo_v3:get_field(enqueue_count, StateV3),
enqueuers = rabbit_fifo_v3:get_field(enqueuers, StateV3),
ra_indexes = rabbit_fifo_v3:get_field(ra_indexes, StateV3),
release_cursors = rabbit_fifo_v3:get_field(release_cursors, StateV3),
consumers = Consumers,
% consumers that require further service are queued here
service_queue = rabbit_fifo_v3:get_field(service_queue, StateV3),
dlx = rabbit_fifo_v3:get_field(dlx, StateV3),
msg_bytes_enqueue = rabbit_fifo_v3:get_field(msg_bytes_enqueue, StateV3),
msg_bytes_checkout = rabbit_fifo_v3:get_field(msg_bytes_checkout, StateV3),
waiting_consumers = rabbit_fifo_v3:get_field(waiting_consumers, StateV3),
last_active = rabbit_fifo_v3:get_field(last_active, StateV3),
msg_cache = rabbit_fifo_v3:get_field(msg_cache, StateV3)}.

purge_node(Meta, Node, State, Effects) ->
lists:foldl(fun(Pid, {S0, E0}) ->
Expand Down Expand Up @@ -1605,19 +1620,6 @@ drop_head(#?STATE{ra_indexes = Indexes0} = State0, Effects) ->
{State0, Effects}
end.

maybe_set_msg_ttl(#basic_message{content = #content{properties = none}},
RaCmdTs, Header,
#?STATE{cfg = #cfg{msg_ttl = PerQueueMsgTTL}}) ->
update_expiry_header(RaCmdTs, PerQueueMsgTTL, Header);
maybe_set_msg_ttl(#basic_message{content = #content{properties = Props}},
RaCmdTs, Header,
#?STATE{cfg = #cfg{msg_ttl = PerQueueMsgTTL}}) ->
%% rabbit_quorum_queue will leave the properties decoded if and only if
%% per message message TTL is set.
%% We already check in the channel that expiration must be valid.
{ok, PerMsgMsgTTL} = rabbit_basic:parse_expiration(Props),
TTL = min(PerMsgMsgTTL, PerQueueMsgTTL),
update_expiry_header(RaCmdTs, TTL, Header);
maybe_set_msg_ttl(Msg, RaCmdTs, Header,
#?STATE{cfg = #cfg{msg_ttl = MsgTTL}}) ->
case mc:is(Msg) of
Expand Down Expand Up @@ -1832,10 +1834,10 @@ update_smallest_raft_index(IncomingRaftIdx, Reply,
#?STATE{cfg = Cfg,
release_cursors = Cursors0} = State0,
Effects) ->
Total = messages_total(State0),
% Total = messages_total(State0),
%% TODO: optimise
case smallest_raft_index(State0) of
undefined when Total == 0 ->
undefined ->
% there are no messages on queue anymore and no pending enqueues
% we can forward release_cursor all the way until
% the last received command, hooray
Expand All @@ -1846,8 +1848,8 @@ update_smallest_raft_index(IncomingRaftIdx, Reply,
release_cursors = lqueue:new(),
enqueue_count = 0},
{State, Reply, Effects ++ [{release_cursor, IncomingRaftIdx, State}]};
undefined ->
{State0, Reply, Effects};
% undefined ->
% {State0, Reply, Effects};
Smallest when is_integer(Smallest) ->
case find_next_cursor(Smallest, Cursors0) of
empty ->
Expand Down Expand Up @@ -2077,7 +2079,7 @@ take_next_msg(#?STATE{returns = Returns0,
case rabbit_fifo_q:out(Messages0) of
{empty, _} ->
empty;
{{value, ?MSG(RaftIdx, _) = Msg}, Messages} ->
{_P, ?MSG(RaftIdx, _) = Msg, Messages} ->
%% add index here
Indexes = rabbit_fifo_index:append(RaftIdx, Indexes0),
{Msg, State#?STATE{messages = Messages,
Expand Down Expand Up @@ -2418,7 +2420,8 @@ normalize(#?STATE{ra_indexes = _Indexes,
release_cursors = Cursors,
dlx = DlxState} = State) ->
State#?STATE{returns = lqueue:from_list(lqueue:to_list(Returns)),
messages = rabbit_fifo_q:from_lqueue(Messages),
messages = rabbit_fifo_q:normalize(Messages,
rabbit_fifo_q:new()),
release_cursors = lqueue:from_list(lqueue:to_list(Cursors)),
dlx = rabbit_fifo_dlx:normalize(DlxState)}.

Expand Down Expand Up @@ -2528,9 +2531,6 @@ add_bytes_return(Header,
State#?STATE{msg_bytes_checkout = Checkout - Size,
msg_bytes_enqueue = Enqueue + Size}.

message_size(#basic_message{content = Content}) ->
#content{payload_fragments_rev = PFR} = Content,
iolist_size(PFR);
message_size(B) when is_binary(B) ->
byte_size(B);
message_size(Msg) ->
Expand Down Expand Up @@ -2652,12 +2652,7 @@ smallest_raft_index(#?STATE{messages = Messages,
ra_indexes = Indexes,
dlx = DlxState}) ->
SmallestDlxRaIdx = rabbit_fifo_dlx:smallest_raft_index(DlxState),
SmallestMsgsRaIdx = case rabbit_fifo_q:get(Messages) of
?MSG(I, _) when is_integer(I) ->
I;
_ ->
undefined
end,
SmallestMsgsRaIdx = rabbit_fifo_q:get_lowest_index(Messages),
SmallestRaIdx = rabbit_fifo_index:smallest(Indexes),
lists:min([SmallestDlxRaIdx, SmallestMsgsRaIdx, SmallestRaIdx]).

Expand Down
87 changes: 61 additions & 26 deletions deps/rabbit/src/rabbit_fifo_q.erl
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
-module(rabbit_fifo_q).

-include("rabbit_fifo.hrl").
-export([
new/0,
in/3,
out/1,
get/1,
len/1,
from_lqueue/1
from_lqueue/1,
normalize/2,
get_lowest_index/1
]).

-define(WEIGHT, 2).
Expand All @@ -26,14 +29,16 @@
new() ->
#?MODULE{}.

-spec in(hi | lo, term(), state()) -> state().
-spec in(hi | lo, msg(), state()) -> state().
in(hi, Item, #?MODULE{hi = Hi, len = Len} = State) ->
State#?MODULE{hi = queue:in(Item, Hi),
len = Len + 1};
in(lo, Item, #?MODULE{lo = Lo, len = Len} = State) ->
State#?MODULE{lo = queue:in(Item, Lo),
len = Len + 1}.

-spec out(state()) ->
{empty, state()} | {hi | lo, msg(), state()}.
out(#?MODULE{len = 0} = S) ->
{empty, S};
out(#?MODULE{hi = Hi0,
Expand All @@ -45,29 +50,30 @@ out(#?MODULE{hi = Hi0,
%% try lo before hi
case queue:out(Lo0) of
{empty, _} ->
{{value, _} = Ret, Hi} = queue:out(Hi0),
{Ret, State#?MODULE{hi = Hi,
dequeue_counter = 0,
len = Len - 1}};
{Ret, Lo} ->
{Ret, State#?MODULE{lo = Lo,
dequeue_counter = 0,
len = Len - 1}}
{{value, Ret}, Hi} = queue:out(Hi0),
{hi, Ret, State#?MODULE{hi = Hi,
dequeue_counter = 0,
len = Len - 1}};
{{value, Ret}, Lo} ->
{lo, Ret, State#?MODULE{lo = Lo,
dequeue_counter = 0,
len = Len - 1}}
end;
false ->
case queue:out(Hi0) of
{empty, _} ->
{{value, _} = Ret, Lo} = queue:out(Lo0),
{Ret, State#?MODULE{lo = Lo,
dequeue_counter = C + 1,
len = Len - 1}};
{Ret, Hi} ->
{Ret, State#?MODULE{hi = Hi,
dequeue_counter = C + 1,
len = Len - 1}}
{{value, Ret}, Lo} = queue:out(Lo0),
{lo, Ret, State#?MODULE{lo = Lo,
dequeue_counter = C + 1,
len = Len - 1}};
{{value, Ret}, Hi} ->
{hi, Ret, State#?MODULE{hi = Hi,
dequeue_counter = C + 1,
len = Len - 1}}
end
end.

-spec get(state()) -> empty | msg().
get(#?MODULE{len = 0}) ->
empty;
get(#?MODULE{hi = Hi0,
Expand All @@ -78,27 +84,56 @@ get(#?MODULE{hi = Hi0,
%% try lo before hi
case queue:peek(Lo0) of
empty ->
queue:peek(Hi0);
{value, _} = Ret ->
{value, Ret} = queue:peek(Hi0),
Ret;
{value, Ret} ->
Ret
end;
false ->
case queue:peek(Hi0) of
empty ->
queue:peek(Lo0);
{value, _} = Ret ->
{value, Ret} = queue:peek(Lo0),
Ret;
{value, Ret} ->
Ret
end
end.

-spec len(state()) -> non_neg_integer().
len(#?MODULE{len = Len}) ->
Len.

-spec from_lqueue(lqueue:lqueue(msg())) -> state().
from_lqueue(LQ) ->
lqueue:fold(
fun (Item, Acc) ->
in(lo, Item, Acc)
end, new(), LQ).
lqueue:fold(fun (Item, Acc) ->
in(lo, Item, Acc)
end, new(), LQ).

-spec normalize(state(), state()) -> state().
normalize(Q0, Acc) ->
case out(Q0) of
{empty, _} ->
Acc;
{P, Msg, Q} ->
normalize(Q, in(P, Msg, Acc))
end.

-spec get_lowest_index(state()) -> undefined | ra:index().
get_lowest_index(#?MODULE{len = 0}) ->
undefined;
get_lowest_index(#?MODULE{hi = Hi, lo = Lo}) ->
case queue:peek(Hi) of
empty ->
{value, ?MSG(LoIdx, _)} = queue:peek(Lo),
LoIdx;
{value, ?MSG(HiIdx, _)} ->
case queue:peek(Lo) of
{value, ?MSG(LoIdx, _)} ->
max(HiIdx, LoIdx);
empty ->
HiIdx
end
end.

%% internals

Expand Down
19 changes: 18 additions & 1 deletion deps/rabbit/src/rabbit_fifo_v3.erl
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@
make_update_config/1,
make_garbage_collection/0,
convert_v1_to_v2/1,
convert_v2_to_v3/1
convert_v2_to_v3/1,

get_field/2
]).

-ifdef(TEST).
Expand Down Expand Up @@ -766,6 +768,21 @@ convert_v2_to_v3(#rabbit_fifo{consumers = ConsumersV2} = StateV2) ->
end, ConsumersV2),
StateV2#rabbit_fifo{consumers = ConsumersV3}.

get_field(Field, State) ->
Fields = record_info(fields, ?STATE),
Index = record_index_of(Field, Fields),
element(Index, State).

record_index_of(F, Fields) ->
index_of(2, F, Fields).

index_of(_, F, []) ->
exit({field_not_found, F});
index_of(N, F, [F | _]) ->
N;
index_of(N, F, [_ | T]) ->
index_of(N+1, F, T).

convert_consumer_v2_to_v3(C = #consumer{cfg = Cfg = #consumer_cfg{credit_mode = simple_prefetch,
meta = #{prefetch := Prefetch}}}) ->
C#consumer{cfg = Cfg#consumer_cfg{credit_mode = {simple_prefetch, Prefetch}}};
Expand Down
Loading

0 comments on commit 93dfb0f

Please sign in to comment.