Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimise Websocket performance (again) #1658

Merged
merged 4 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
# Don't include Erlang.mk in diffs.
erlang.mk -diff

# Don't change line endings in our test data on Windows.
test/ws_perf_SUITE_data/*.txt -text
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@ PROJECT_REGISTERED = cowboy_clock

PLT_APPS = public_key ssl # ct_helper gun common_test inets
CT_OPTS += -ct_hooks cowboy_ct_hook [] # -boot start_sasl
#CT_OPTS += +JPperf true +S 1

# Dependencies.

LOCAL_DEPS = crypto

DEPS = cowlib ranch
dep_cowlib = git https://github.com/ninenines/cowlib master
dep_ranch = git https://github.com/ninenines/ranch 1.8.0
dep_ranch = git https://github.com/ninenines/ranch 1.8.x

ifeq ($(COWBOY_QUICER),1)
DEPS += quicer
Expand Down
38 changes: 29 additions & 9 deletions src/cowboy_http2.erl
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@
state :: {module, any()}
}).

%% We don't want to reset the idle timeout too often,
%% so we don't reset it on data. Instead we reset the
%% number of ticks we have observed. We divide the
%% timeout value by a value and that value becomes
%% the number of ticks at which point we can drop
%% the connection. This value is the number of ticks.
-define(IDLE_TIMEOUT_TICKS, 10).

-record(state, {
parent = undefined :: pid(),
ref :: ranch:ref(),
Expand All @@ -95,6 +103,7 @@

%% Timer for idle_timeout; also used for goaway timers.
timer = undefined :: undefined | reference(),
idle_timeout_num = 0 :: 0..?IDLE_TIMEOUT_TICKS,
essen marked this conversation as resolved.
Show resolved Hide resolved

%% Remote address and port for the connection.
peer = undefined :: {inet:ip_address(), inet:port_number()},
Expand Down Expand Up @@ -166,7 +175,7 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer
State = set_idle_timeout(init_rate_limiting(#state{parent=Parent, ref=Ref, socket=Socket,
transport=Transport, proxy_header=ProxyHeader,
opts=Opts, peer=Peer, sock=Sock, cert=Cert,
http2_status=sequence, http2_machine=HTTP2Machine})),
http2_status=sequence, http2_machine=HTTP2Machine}), 0),
safe_setopts_active(State),
case Buffer of
<<>> -> loop(State, Buffer);
Expand Down Expand Up @@ -222,7 +231,7 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer
<<"connection">> => <<"Upgrade">>,
<<"upgrade">> => <<"h2c">>
}, ?MODULE, undefined}), %% @todo undefined or #{}?
State = set_idle_timeout(init_rate_limiting(State2#state{http2_status=sequence})),
State = set_idle_timeout(init_rate_limiting(State2#state{http2_status=sequence}), 0),
%% In the case of HTTP/1.1 Upgrade we cannot send the Preface
%% until we send the 101 response.
ok = maybe_socket_error(State, Transport:send(Socket, Preface)),
Expand All @@ -249,7 +258,7 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
receive
%% Socket messages.
{OK, Socket, Data} when OK =:= element(1, Messages) ->
parse(set_idle_timeout(State), << Buffer/binary, Data/binary >>);
parse(State#state{idle_timeout_num=0}, << Buffer/binary, Data/binary >>);
{Closed, Socket} when Closed =:= element(2, Messages) ->
Reason = case State#state.http2_status of
closing -> {stop, closed, 'The client is going away.'};
Expand All @@ -273,8 +282,7 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {State, Buffer});
%% Timeouts.
{timeout, TimerRef, idle_timeout} ->
terminate(State, {stop, timeout,
'Connection idle longer than configuration allows.'});
tick_idle_timeout(State, Buffer);
{timeout, Ref, {shutdown, Pid}} ->
cowboy_children:shutdown_timeout(Children, Ref, Pid),
loop(State, Buffer);
Expand Down Expand Up @@ -302,12 +310,24 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
terminate(State, {internal_error, timeout, 'No message or data received before timeout.'})
end.

set_idle_timeout(State=#state{http2_status=Status, timer=TimerRef})
tick_idle_timeout(State=#state{idle_timeout_num=?IDLE_TIMEOUT_TICKS}, _) ->
terminate(State, {stop, timeout,
'Connection idle longer than configuration allows.'});
tick_idle_timeout(State=#state{idle_timeout_num=TimeoutNum}, Buffer) ->
loop(set_idle_timeout(State, TimeoutNum + 1), Buffer).

set_idle_timeout(State=#state{http2_status=Status, timer=TimerRef}, _)
when Status =:= closing_initiated orelse Status =:= closing,
TimerRef =/= undefined ->
State;
set_idle_timeout(State=#state{opts=Opts}) ->
set_timeout(State, maps:get(idle_timeout, Opts, 60000), idle_timeout).
set_idle_timeout(State=#state{opts=Opts}, TimeoutNum) ->
case maps:get(idle_timeout, Opts, 60000) of
infinity ->
State#state{timer=undefined};
Timeout ->
set_timeout(State#state{idle_timeout_num=TimeoutNum},
Timeout div ?IDLE_TIMEOUT_TICKS, idle_timeout)
end.

set_timeout(State=#state{timer=TimerRef0}, Timeout, Message) ->
ok = case TimerRef0 of
Expand All @@ -323,7 +343,7 @@ set_timeout(State=#state{timer=TimerRef0}, Timeout, Message) ->
maybe_reset_idle_timeout(State=#state{opts=Opts}) ->
case maps:get(reset_idle_timeout_on_send, Opts, false) of
true ->
set_idle_timeout(State);
State#state{idle_timeout_num=0};
false ->
State
end.
Expand Down
49 changes: 34 additions & 15 deletions src/cowboy_websocket.erl
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,14 @@
}.
-export_type([opts/0]).

%% We don't want to reset the idle timeout too often,
%% so we don't reset it on data. Instead we reset the
%% number of ticks we have observed. We divide the
%% timeout value by a value and that value becomes
%% the number of ticks at which point we can drop
%% the connection. This value is the number of ticks.
-define(IDLE_TIMEOUT_TICKS, 10).
essen marked this conversation as resolved.
Show resolved Hide resolved

-record(state, {
parent :: undefined | pid(),
ref :: ranch:ref(),
Expand All @@ -86,6 +94,7 @@
handler :: module(),
key = undefined :: undefined | binary(),
timeout_ref = undefined :: undefined | reference(),
timeout_num = 0 :: 0..?IDLE_TIMEOUT_TICKS,
messages = undefined :: undefined | {atom(), atom(), atom()}
| {atom(), atom(), atom(), atom()},
hibernate = false :: boolean(),
Expand Down Expand Up @@ -297,9 +306,9 @@ takeover(Parent, Ref, Socket, Transport, _Opts, Buffer,
undefined -> undefined;
_ -> Transport:messages()
end,
State = loop_timeout(State0#state{parent=Parent,
State = set_idle_timeout(State0#state{parent=Parent,
ref=Ref, socket=Socket, transport=Transport,
key=undefined, messages=Messages}),
key=undefined, messages=Messages}, 0),
%% We call parse_header/3 immediately because there might be
%% some data in the buffer that was sent along with the handshake.
%% While it is not allowed by the protocol to send frames immediately,
Expand Down Expand Up @@ -373,28 +382,39 @@ before_loop(State=#state{hibernate=true}, HandlerState, ParseState) ->
before_loop(State, HandlerState, ParseState) ->
loop(State, HandlerState, ParseState).

-spec loop_timeout(#state{}) -> #state{}.
loop_timeout(State=#state{opts=Opts, timeout_ref=PrevRef}) ->
-spec set_idle_timeout(#state{}, 0..?IDLE_TIMEOUT_TICKS) -> #state{}.

set_idle_timeout(State=#state{opts=Opts, timeout_ref=PrevRef}, TimeoutNum) ->
%% Most of the time we don't need to cancel the timer since it
%% will have triggered already. But this call is harmless so
%% it is kept to simplify the code as we do need to cancel when
essen marked this conversation as resolved.
Show resolved Hide resolved
%% options are changed dynamically.
_ = case PrevRef of
undefined -> ignore;
PrevRef -> erlang:cancel_timer(PrevRef)
end,
case maps:get(idle_timeout, Opts, 60000) of
infinity ->
State#state{timeout_ref=undefined};
State#state{timeout_ref=undefined, timeout_num=TimeoutNum};
Timeout ->
TRef = erlang:start_timer(Timeout, self(), ?MODULE),
State#state{timeout_ref=TRef}
TRef = erlang:start_timer(Timeout div ?IDLE_TIMEOUT_TICKS, self(), ?MODULE),
State#state{timeout_ref=TRef, timeout_num=TimeoutNum}
end.

-define(reset_idle_timeout(State), State#state{timeout_num=0}).

tick_idle_timeout(State=#state{timeout_num=?IDLE_TIMEOUT_TICKS}, HandlerState, _) ->
websocket_close(State, HandlerState, timeout);
tick_idle_timeout(State=#state{timeout_num=TimeoutNum}, HandlerState, ParseState) ->
before_loop(set_idle_timeout(State, TimeoutNum + 1), HandlerState, ParseState).

-spec loop(#state{}, any(), parse_state()) -> no_return().
loop(State=#state{parent=Parent, socket=Socket, messages=Messages,
timeout_ref=TRef}, HandlerState, ParseState) ->
receive
%% Socket messages. (HTTP/1.1)
{OK, Socket, Data} when OK =:= element(1, Messages) ->
State2 = loop_timeout(State),
parse(State2, HandlerState, ParseState, Data);
parse(?reset_idle_timeout(State), HandlerState, ParseState, Data);
{Closed, Socket} when Closed =:= element(2, Messages) ->
terminate(State, HandlerState, {error, closed});
{Error, Socket, Reason} when Error =:= element(3, Messages) ->
Expand All @@ -407,18 +427,16 @@ loop(State=#state{parent=Parent, socket=Socket, messages=Messages,
%% Body reading messages. (HTTP/2)
{request_body, _Ref, nofin, Data} ->
maybe_read_body(State),
State2 = loop_timeout(State),
parse(State2, HandlerState, ParseState, Data);
parse(?reset_idle_timeout(State), HandlerState, ParseState, Data);
%% @todo We need to handle this case as if it was an {error, closed}
%% but not before we finish processing frames. We probably should have
%% a check in before_loop to let us stop looping if a flag is set.
{request_body, _Ref, fin, _, Data} ->
maybe_read_body(State),
State2 = loop_timeout(State),
parse(State2, HandlerState, ParseState, Data);
parse(?reset_idle_timeout(State), HandlerState, ParseState, Data);
%% Timeouts.
{timeout, TRef, ?MODULE} ->
websocket_close(State, HandlerState, timeout);
tick_idle_timeout(State, HandlerState, ParseState);
{timeout, OlderTRef, ?MODULE} when is_reference(OlderTRef) ->
before_loop(State, HandlerState, ParseState);
%% System messages.
Expand Down Expand Up @@ -600,7 +618,8 @@ commands([{deflate, Deflate}|Tail], State, Data) when is_boolean(Deflate) ->
commands([{set_options, SetOpts}|Tail], State0=#state{opts=Opts}, Data) ->
State = case SetOpts of
#{idle_timeout := IdleTimeout} ->
loop_timeout(State0#state{opts=Opts#{idle_timeout => IdleTimeout}});
%% We reset the number of ticks when changing the idle_timeout option.
set_idle_timeout(State0#state{opts=Opts#{idle_timeout => IdleTimeout}}, 0);
_ ->
State0
end,
Expand Down
14 changes: 10 additions & 4 deletions test/cowboy_test.erl
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,16 @@ common_all() ->
end.

common_groups(Tests) ->
Opts = case os:getenv("NO_PARALLEL") of
false -> [parallel];
_ -> []
Parallel = case os:getenv("NO_PARALLEL") of
false -> parallel;
_ -> no_parallel
end,
common_groups(Tests, Parallel).

common_groups(Tests, Parallel) ->
Opts = case Parallel of
parallel -> [parallel];
no_parallel -> []
end,
Groups = [
{http, Opts, Tests},
Expand All @@ -113,7 +120,6 @@ common_groups(Tests) ->
Groups
end.


init_common_groups(Name = http, Config, Mod) ->
init_http(Name, #{
env => #{dispatch => Mod:init_dispatch(Config)}
Expand Down
12 changes: 10 additions & 2 deletions test/ws_autobahn_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ end_per_group(Listener, _Config) ->

init_dispatch() ->
cowboy_router:compile([
{"localhost", [
{"host.docker.internal", [
{"/ws_echo", ws_echo, []}
]}
]).
Expand All @@ -73,7 +73,15 @@ autobahn_fuzzingclient(Config) ->
end.

do_start_port(Config, Pid) ->
Port = open_port({spawn, "wstest -m fuzzingclient -s " ++ config(data_dir, Config) ++ "client.json"},
% Cmd = "wstest -m fuzzingclient -s " ++ config(data_dir, Config) ++ "client.json",
Cmd = "sudo docker run --rm "
"-v " ++ config(data_dir, Config) ++ "/client.json:/client.json "
"-v " ++ config(priv_dir, Config) ++ "/reports:/reports "
"--add-host=host.docker.internal:host-gateway "
"--name fuzzingclient "
"crossbario/autobahn-testsuite "
"wstest -m fuzzingclient -s client.json",
Port = open_port({spawn, Cmd},
[{line, 10000}, {cd, config(priv_dir, Config)}, binary, eof]),
do_receive_infinity(Port, Pid).

Expand Down
2 changes: 1 addition & 1 deletion test/ws_autobahn_SUITE_data/client.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

"servers": [{
"agent": "Cowboy",
"url": "ws://localhost:33080/ws_echo",
"url": "ws://host.docker.internal:33080/ws_echo",
"options": {"version": 18}
}],

Expand Down
Loading
Loading