Skip to content

Commit

Permalink
WIP Support AMQP 1.0 token renewal
Browse files Browse the repository at this point in the history
Closes #9259.

 ## What?
Allow an AMQP 1.0 client to renew an OAuth 2.0 token before it expires.

 ## Why?
This allows clients to keep the AMQP connection open instead of having
to create a new connection whenever the token expires.

 ## How?
As explained in #9259 (comment)
the client can `PUT` a new token on HTTP API v2 path `/auth/tokens`.
RabbitMQ will then:
1. Store the new token on the given connection.
2. Recheck access to the connection's vhost.
3. Clear all permission caches in the AMQP sessions.
4. Recheck write permissions to exchanges for links publishing to
   RabbitMQ, and recheck read permissions from queues for links
   consuming from RabbitMQ. The latter complies with the user
   expectation in #11364.

TODOs:
* Check new log process metadata
* Update docs
  • Loading branch information
ansd committed Oct 29, 2024
1 parent 3df9675 commit eff36e1
Show file tree
Hide file tree
Showing 8 changed files with 411 additions and 55 deletions.
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_access_control.erl
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ check_user_id0(ClaimedUserName, #user{username = ActualUserName,
end.

-spec update_state(User :: rabbit_types:user(), NewState :: term()) ->
{'ok', rabbit_types:auth_user()} |
{'ok', rabbit_types:user()} |
{'refused', string()} |
{'error', any()}.

Expand Down
14 changes: 13 additions & 1 deletion deps/rabbit/src/rabbit_amqp_management.erl
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,19 @@ handle_http_req(<<"GET">>,
Bindings0 = rabbit_binding:list_for_source_and_destination(SrcXName, DstName),
Bindings = [B || B = #binding{key = K} <- Bindings0, K =:= Key],
RespPayload = encode_bindings(Bindings),
{<<"200">>, RespPayload, PermCaches}.
{<<"200">>, RespPayload, PermCaches};

handle_http_req(<<"PUT">>,
[<<"auth">>, <<"tokens">>],
_Query,
ReqPayload,
_Vhost,
_User,
ConnPid,
PermCaches) ->
{binary, Token} = ReqPayload,
ok = rabbit_amqp_reader:set_credential(ConnPid, Token),
{<<"204">>, null, PermCaches}.

decode_queue({map, KVList}) ->
M = lists:foldl(
Expand Down
84 changes: 57 additions & 27 deletions deps/rabbit/src/rabbit_amqp_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@

-export([init/1,
info/2,
mainloop/2]).
mainloop/2,
set_credential/2]).

-export([system_continue/3,
system_terminate/4,
Expand Down Expand Up @@ -53,6 +54,7 @@
channel_max :: non_neg_integer(),
auth_mechanism :: sasl_init_unprocessed | {binary(), module()},
auth_state :: term(),
credential_timer :: undefined | reference(),
properties :: undefined | {map, list(tuple())}
}).

Expand Down Expand Up @@ -139,6 +141,11 @@ server_properties() ->
Props = [{{symbol, <<"node">>}, {utf8, atom_to_binary(node())}} | Props1],
{map, Props}.

-spec set_credential(pid(), binary()) -> ok.
set_credential(Pid, Credential) ->
Pid ! {set_credential, Credential},
ok.

%%--------------------------------------------------------------------------

inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
Expand Down Expand Up @@ -243,6 +250,8 @@ handle_other({'$gen_cast', {force_event_refresh, _Ref}}, State) ->
State;
handle_other(terminate_connection, _State) ->
stop;
handle_other({set_credential, Cred}, State) ->
set_credential0(Cred, State);
handle_other(credential_expired, State) ->
Error = error_frame(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, "credential expired", []),
handle_exception(State, 0, Error);
Expand Down Expand Up @@ -416,15 +425,17 @@ handle_connection_frame(
},
helper_sup = HelperSupPid,
sock = Sock} = State0) ->
logger:update_process_metadata(#{amqp_container => ContainerId}),
Vhost = vhost(Hostname),
logger:update_process_metadata(#{amqp_container => ContainerId,
vhost => Vhost,
user => Username}),
ok = check_user_loopback(State0),
ok = check_vhost_exists(Vhost, State0),
ok = check_vhost_alive(Vhost),
ok = rabbit_access_control:check_vhost_access(User, Vhost, {socket, Sock}, #{}),
ok = check_vhost_connection_limit(Vhost, Username),
ok = check_user_connection_limit(Username),
ok = ensure_credential_expiry_timer(User),
Timer = maybe_start_credential_expiry_timer(User),
rabbit_core_metrics:auth_attempt_succeeded(<<>>, Username, amqp10),
notify_auth(user_authentication_success, Username, State0),
rabbit_log_connection:info(
Expand Down Expand Up @@ -499,7 +510,8 @@ handle_connection_frame(
outgoing_max_frame_size = OutgoingMaxFrameSize,
channel_max = EffectiveChannelMax,
properties = Properties,
timeout = ReceiveTimeoutMillis},
timeout = ReceiveTimeoutMillis,
credential_timer = Timer},
heartbeater = Heartbeater},
State = start_writer(State1),
HostnameVal = case Hostname of
Expand Down Expand Up @@ -871,39 +883,57 @@ check_user_connection_limit(Username) ->
end.


%% TODO Provide a means for the client to refresh the credential.
%% This could be either via:
%% 1. SASL (if multiple authentications are allowed on the same AMQP 1.0 connection), see
%% https://datatracker.ietf.org/doc/html/rfc4422#section-3.8 , or
%% 2. Claims Based Security (CBS) extension, see https://docs.oasis-open.org/amqp/amqp-cbs/v1.0/csd01/amqp-cbs-v1.0-csd01.html
%% and https://github.com/rabbitmq/rabbitmq-server/issues/9259
%% 3. Simpler variation of 2. where a token is put to a special /token node.
%%
%% If the user does not refresh their credential on time (the only implementation currently),
%% close the entire connection as we must assume that vhost access could have been revoked.
%%
%% If the user refreshes their credential on time (to be implemented), the AMQP reader should
%% 1. rabbit_access_control:check_vhost_access/4
%% 2. send a message to all its sessions which should then erase the permission caches and
%% re-check all link permissions (i.e. whether reading / writing to exchanges / queues is still allowed).
%% 3. cancel the current timer, and set a new timer
%% similary as done for Stream connections, see https://github.com/rabbitmq/rabbitmq-server/issues/10292
ensure_credential_expiry_timer(User) ->
set_credential0(Cred,
State = #v1{connection = #v1_connection{
user = User0,
vhost = Vhost,
credential_timer = OldTimer} = Conn,
tracked_channels = Chans,
sock = Sock}) ->
rabbit_log:info("updating credential", []),
case rabbit_access_control:update_state(User0, Cred) of
{ok, User} ->
try rabbit_access_control:check_vhost_access(User, Vhost, {socket, Sock}, #{}) of
ok ->
maps:foreach(fun(_ChanNum, Pid) ->
rabbit_amqp_session:reset_authz(Pid, User)
end, Chans),
case OldTimer of
undefined -> ok;
Ref -> erlang:cancel_timer(Ref, [{info, false}])
end,
NewTimer = maybe_start_credential_expiry_timer(User),
State#v1{connection = Conn#v1_connection{
user = User,
credential_timer = NewTimer}}
catch _:Reason ->
Error = error_frame(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS,
"access to vhost ~s failed for new credential: ~p",
[Vhost, Reason]),
handle_exception(State, 0, Error)
end;
Err ->
Error = error_frame(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS,
"credential update failed: ~p",
[Err]),
handle_exception(State, 0, Error)
end.

maybe_start_credential_expiry_timer(User) ->
case rabbit_access_control:expiry_timestamp(User) of
never ->
ok;
undefined;
Ts when is_integer(Ts) ->
Time = (Ts - os:system_time(second)) * 1000,
rabbit_log:debug(
"Credential expires in ~b ms frow now (absolute timestamp = ~b seconds since epoch)",
"credential expires in ~b ms frow now (absolute timestamp = ~b seconds since epoch)",
[Time, Ts]),
case Time > 0 of
true ->
_TimerRef = erlang:send_after(Time, self(), credential_expired),
ok;
erlang:send_after(Time, self(), credential_expired);
false ->
protocol_error(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS,
"Credential expired ~b ms ago", [abs(Time)])
"credential expired ~b ms ago", [abs(Time)])
end
end.

Expand Down
47 changes: 45 additions & 2 deletions deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@
list_local/0,
conserve_resources/3,
check_resource_access/4,
check_read_permitted_on_topic/4
check_read_permitted_on_topic/4,
reset_authz/2
]).

-export([init/1,
Expand Down Expand Up @@ -393,6 +394,10 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName,
handle_max = ClientHandleMax}}) ->
process_flag(trap_exit, true),
rabbit_process_flag:adjust_for_message_handling_proc(),
logger:update_process_metadata(#{channel_number => ChannelNum,
connection => ConnName,
vhost => Vhost,
user => User#user.username}),

ok = pg:join(pg_scope(), self(), self()),
Alarms0 = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
Expand Down Expand Up @@ -480,6 +485,10 @@ list_local() ->
conserve_resources(Pid, Source, {_, Conserve, _}) ->
gen_server:cast(Pid, {conserve_resources, Source, Conserve}).

-spec reset_authz(pid(), rabbit_types:user()) -> ok.
reset_authz(Pid, User) ->
gen_server:cast(Pid, {reset_authz, User}).

handle_call(Msg, _From, State) ->
Reply = {error, {not_understood, Msg}},
reply(Reply, State).
Expand Down Expand Up @@ -574,7 +583,18 @@ handle_cast({conserve_resources, Alarm, Conserve},
noreply(State);
handle_cast(refresh_config, #state{cfg = #cfg{vhost = Vhost} = Cfg} = State0) ->
State = State0#state{cfg = Cfg#cfg{trace_state = rabbit_trace:init(Vhost)}},
noreply(State).
noreply(State);
handle_cast({reset_authz, User}, #state{cfg = Cfg} = State0) ->
State1 = State0#state{
permission_cache = [],
topic_permission_cache = [],
cfg = Cfg#cfg{user = User}},
try recheck_authz(State1) of
State ->
noreply(State)
catch exit:#'v1_0.error'{} = Error ->
log_error_and_close_session(Error, State1)
end.

log_error_and_close_session(
Error, State = #state{cfg = #cfg{reader_pid = ReaderPid,
Expand Down Expand Up @@ -3522,6 +3542,29 @@ check_topic_authorisation(#exchange{type = topic,
check_topic_authorisation(_, _, _, _, Cache) ->
Cache.

recheck_authz(#state{incoming_links = IncomingLinks,
outgoing_links = OutgoingLinks,
permission_cache = Cache0,
cfg = #cfg{user = User}
} = State) ->
rabbit_log:debug("rechecking link authorizations", []),
Cache1 = maps:fold(
fun(_Handle, #incoming_link{exchange = X}, Cache) ->
case X of
#exchange{name = XName} ->
check_resource_access(XName, write, User, Cache);
#resource{} = XName ->
check_resource_access(XName, write, User, Cache);
to ->
Cache
end
end, Cache0, IncomingLinks),
Cache2 = maps:fold(
fun(_Handle, #outgoing_link{queue_name = QName}, Cache) ->
check_resource_access(QName, read, User, Cache)
end, Cache1, OutgoingLinks),
State#state{permission_cache = Cache2}.

check_user_id(Mc, User) ->
case rabbit_access_control:check_user_id(Mc, User) of
ok ->
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ force_event_refresh(Ref) ->
list_queue_states(Pid) ->
gen_server2:call(Pid, list_queue_states).

-spec update_user_state(pid(), rabbit_types:auth_user()) -> 'ok' | {error, channel_terminated}.
-spec update_user_state(pid(), rabbit_types:user()) -> 'ok' | {error, channel_terminated}.

update_user_state(Pid, UserState) when is_pid(Pid) ->
case erlang:is_process_alive(Pid) of
Expand Down
21 changes: 20 additions & 1 deletion deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
declare_exchange/3,
bind_exchange/5,
unbind_exchange/5,
delete_exchange/2
delete_exchange/2,

set_token/2
].

-define(TIMEOUT, 20_000).
Expand Down Expand Up @@ -381,6 +383,23 @@ delete_exchange(LinkPair, ExchangeName) ->
Err
end.

%% Renew OAuth 2.0 token.
-spec set_token(link_pair(), binary()) ->
ok | {error, term()}.
set_token(LinkPair, Token) ->
Props = #{subject => <<"PUT">>,
to => <<"/auth/tokens">>},
Body = {binary, Token},
case request(LinkPair, Props, Body) of
{ok, Resp} ->
case is_success(Resp) of
true -> ok;
false -> {error, Resp}
end;
Err ->
Err
end.

-spec request(link_pair(), amqp10_msg:amqp10_properties(), amqp10_prim()) ->
{ok, Response :: amqp10_msg:amqp10_msg()} | {error, term()}.
request(#link_pair{session = Session,
Expand Down
Loading

0 comments on commit eff36e1

Please sign in to comment.