Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split trailers from headers #242

Merged
merged 5 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 38 additions & 18 deletions lib/finch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,22 @@ defmodule Finch do
The stream function given to `stream/5`.
"""
@type stream(acc) ::
({:status, integer} | {:headers, Mint.Types.headers()} | {:data, binary}, acc -> acc)
({:status, integer}
| {:headers, Mint.Types.headers()}
| {:data, binary}
| {:trailers, Mint.Types.headers()},
acc ->
acc)

@typedoc """
The stream function given to `stream_while/5`.
"""
@type stream_while(acc) ::
({:status, integer} | {:headers, Mint.Types.headers()} | {:data, binary}, acc ->
({:status, integer}
| {:headers, Mint.Types.headers()}
| {:data, binary}
| {:trailers, Mint.Types.headers()},
acc ->
{:cont, acc} | {:halt, acc})

@doc """
Expand Down Expand Up @@ -279,19 +288,20 @@ defmodule Finch do

## Stream commands

* `{:status, status}` - the status of the http response
* `{:headers, headers}` - the headers of the http response
* `{:data, data}` - a streaming section of the http body
* `{:status, status}` - the http response status
* `{:headers, headers}` - the http response headers
* `{:data, data}` - a streaming section of the http response body
* `{:trailers, trailers}` - the http response trailers

## Options

Shares options with `request/3`.

## Examples

path = "/tmp/big-file.zip"
path = "/tmp/archive.zip"
file = File.open!(path, [:write, :exclusive])
url = "https://domain.com/url/big-file.zip"
url = "https://example.com/archive.zip"
request = Finch.build(:get, url)

Finch.stream(request, MyFinch, nil, fn
Expand Down Expand Up @@ -335,19 +345,20 @@ defmodule Finch do

## Stream commands

* `{:status, status}` - the status of the http response
* `{:headers, headers}` - the headers of the http response
* `{:data, data}` - a streaming section of the http body
* `{:status, status}` - the http response status
* `{:headers, headers}` - the http response headers
* `{:data, data}` - a streaming section of the http response body
* `{:trailers, trailers}` - the http response trailers

## Options

Shares options with `request/3`.

## Examples

path = "/tmp/big-file.zip"
path = "/tmp/archive.zip"
file = File.open!(path, [:write, :exclusive])
url = "https://domain.com/url/big-file.zip"
url = "https://example.com/archive.zip"
request = Finch.build(:get, url)

Finch.stream_while(request, MyFinch, nil, fn
Expand Down Expand Up @@ -399,20 +410,29 @@ defmodule Finch do

def request(%Request{} = req, name, opts) do
request_span req, name do
acc = {nil, [], []}
acc = {nil, [], [], []}

fun = fn
{:status, value}, {_, headers, body} -> {:cont, {value, headers, body}}
{:headers, value}, {status, headers, body} -> {:cont, {status, headers ++ value, body}}
{:data, value}, {status, headers, body} -> {:cont, {status, headers, [value | body]}}
{:status, value}, {_, headers, body, trailers} ->
{:cont, {value, headers, body, trailers}}

{:headers, value}, {status, headers, body, trailers} ->
{:cont, {status, headers ++ value, body, trailers}}

{:data, value}, {status, headers, body, trailers} ->
{:cont, {status, headers, [value | body], trailers}}

{:trailers, value}, {status, headers, body, trailers} ->
{:cont, {status, headers, body, trailers ++ value}}
end

with {:ok, {status, headers, body}} <- __stream__(req, name, acc, fun, opts) do
with {:ok, {status, headers, body, trailers}} <- __stream__(req, name, acc, fun, opts) do
{:ok,
%Response{
status: status,
headers: headers,
body: body |> Enum.reverse() |> IO.iodata_to_binary()
body: body |> Enum.reverse() |> IO.iodata_to_binary(),
trailers: trailers
}}
end
end
Expand Down
114 changes: 88 additions & 26 deletions lib/finch/http1/conn.ex
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,20 @@ defmodule Finch.Conn do
{:ok, mint} ->
Telemetry.stop(:send, start_time, metadata, extra_measurements)
start_time = Telemetry.start(:recv, metadata, extra_measurements)
response = receive_response([], acc, fun, mint, ref, receive_timeout)
resp_metadata = %{status: nil, headers: [], trailers: []}

response =
receive_response(
[],
acc,
fun,
mint,
ref,
receive_timeout,
:headers,
resp_metadata
)

handle_response(response, conn, metadata, start_time, extra_measurements)

{:error, mint, error} ->
Expand Down Expand Up @@ -175,43 +188,90 @@ defmodule Finch.Conn do

defp handle_response(response, conn, metadata, start_time, extra_measurements) do
case response do
{:ok, mint, acc, {status, headers}} ->
metadata = Map.merge(metadata, %{status: status, headers: headers})
{:ok, mint, acc, resp_metadata} ->
metadata = Map.merge(metadata, resp_metadata)
Telemetry.stop(:recv, start_time, metadata, extra_measurements)
{:ok, %{conn | mint: mint}, acc}

{:error, mint, error, {status, headers}} ->
metadata = Map.merge(metadata, %{error: error, status: status, headers: headers})
{:error, mint, error, resp_metadata} ->
metadata = Map.merge(metadata, Map.put(resp_metadata, :error, error))
Telemetry.stop(:recv, start_time, metadata, extra_measurements)
{:error, %{conn | mint: mint}, error}
end
end

defp receive_response(entries, acc, fun, mint, ref, timeout, status \\ nil, headers \\ [])

defp receive_response([{:done, ref} | _], acc, _fun, mint, ref, _timeout, status, headers) do
{:ok, mint, acc, {status, headers}}
defp receive_response(
entries,
acc,
fun,
mint,
ref,
timeout,
fields,
resp_metadata
)

defp receive_response(
[{:done, ref} | _],
acc,
_fun,
mint,
ref,
_timeout,
_fields,
resp_metadata
) do
{:ok, mint, acc, resp_metadata}
end

defp receive_response(_, _acc, _fun, mint, _ref, timeout, status, headers) when timeout < 0 do
{:error, mint, %Mint.TransportError{reason: :timeout}, {status, headers}}
defp receive_response(
_,
_acc,
_fun,
mint,
_ref,
timeout,
_fields,
resp_metadata
)
when timeout < 0 do
{:error, mint, %Mint.TransportError{reason: :timeout}, resp_metadata}
end

defp receive_response([], acc, fun, mint, ref, timeout, status, headers) do
defp receive_response([], acc, fun, mint, ref, timeout, fields, resp_metadata) do
start_time = System.monotonic_time(:millisecond)

case MintHTTP1.recv(mint, 0, timeout) do
{:ok, mint, entries} ->
elapsed_time = System.monotonic_time(:millisecond) - start_time
timeout = timeout - elapsed_time
receive_response(entries, acc, fun, mint, ref, timeout, status, headers)

receive_response(
entries,
acc,
fun,
mint,
ref,
timeout,
fields,
resp_metadata
)

{:error, mint, error, _responses} ->
{:error, mint, error, {status, headers}}
{:error, mint, error, resp_metadata}
end
end

defp receive_response([entry | entries], acc, fun, mint, ref, timeout, status, headers) do
defp receive_response(
[entry | entries],
acc,
fun,
mint,
ref,
timeout,
fields,
resp_metadata
) do
case entry do
{:status, ^ref, value} ->
case fun.({:status, value}, acc) do
Expand All @@ -223,20 +283,22 @@ defmodule Finch.Conn do
mint,
ref,
timeout,
value,
headers
fields,
%{resp_metadata | status: value}
)

{:halt, acc} ->
{:ok, mint} = Mint.HTTP1.close(mint)
{:ok, mint, acc, {status, headers}}
{:ok, mint, acc, resp_metadata}

other ->
raise ArgumentError, "expected {:cont, acc} or {:halt, acc}, got: #{inspect(other)}"
end

{:headers, ^ref, value} ->
case fun.({:headers, value}, acc) do
resp_metadata = update_in(resp_metadata, [fields], &(&1 ++ value))

case fun.({fields, value}, acc) do
{:cont, acc} ->
receive_response(
entries,
Expand All @@ -245,13 +307,13 @@ defmodule Finch.Conn do
mint,
ref,
timeout,
status,
headers ++ value
fields,
resp_metadata
)

{:halt, acc} ->
{:ok, mint} = Mint.HTTP1.close(mint)
{:ok, mint, acc, {status, headers}}
{:ok, mint, acc, resp_metadata}

other ->
raise ArgumentError, "expected {:cont, acc} or {:halt, acc}, got: #{inspect(other)}"
Expand All @@ -267,20 +329,20 @@ defmodule Finch.Conn do
mint,
ref,
timeout,
status,
headers
:trailers,
resp_metadata
)

{:halt, acc} ->
{:ok, mint} = Mint.HTTP1.close(mint)
{:ok, mint, acc, {status, headers}}
{:ok, mint, acc, resp_metadata}

other ->
raise ArgumentError, "expected {:cont, acc} or {:halt, acc}, got: #{inspect(other)}"
end

{:error, ^ref, error} ->
{:error, mint, error, {status, headers}}
{:error, mint, error, resp_metadata}
end
end
end
17 changes: 10 additions & 7 deletions lib/finch/http2/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ defmodule Finch.HTTP2.Pool do
fail_safe_timeout = if is_integer(timeout), do: max(2000, timeout * 2), else: :infinity

try do
response_waiting_loop(acc, fun, request_ref, monitor, fail_safe_timeout)
response_waiting_loop(acc, fun, request_ref, monitor, fail_safe_timeout, :headers)
catch
kind, error ->
Telemetry.exception(:recv, recv_start, kind, error, __STACKTRACE__, %{request: request})
Expand Down Expand Up @@ -77,9 +77,9 @@ defmodule Finch.HTTP2.Pool do
{__MODULE__, {pool, make_ref()}}
end

defp response_waiting_loop(acc, fun, request_ref, monitor_ref, fail_safe_timeout)
defp response_waiting_loop(acc, fun, request_ref, monitor_ref, fail_safe_timeout, fields)

defp response_waiting_loop(acc, fun, request_ref, monitor_ref, fail_safe_timeout) do
defp response_waiting_loop(acc, fun, request_ref, monitor_ref, fail_safe_timeout, fields) do
receive do
{^request_ref, {:status, value}} ->
case fun.({:status, value}, acc) do
Expand All @@ -89,7 +89,8 @@ defmodule Finch.HTTP2.Pool do
fun,
request_ref,
monitor_ref,
fail_safe_timeout
fail_safe_timeout,
fields
)

{:halt, acc} ->
Expand All @@ -102,14 +103,15 @@ defmodule Finch.HTTP2.Pool do
end

{^request_ref, {:headers, value}} ->
case fun.({:headers, value}, acc) do
case fun.({fields, value}, acc) do
{:cont, acc} ->
response_waiting_loop(
acc,
fun,
request_ref,
monitor_ref,
fail_safe_timeout
fail_safe_timeout,
fields
)

{:halt, acc} ->
Expand All @@ -129,7 +131,8 @@ defmodule Finch.HTTP2.Pool do
fun,
request_ref,
monitor_ref,
fail_safe_timeout
fail_safe_timeout,
:trailers
)

{:halt, acc} ->
Expand Down
6 changes: 4 additions & 2 deletions lib/finch/response.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ defmodule Finch.Response do
defstruct [
:status,
body: "",
headers: []
headers: [],
trailers: []
]

@type t :: %Response{
status: Mint.Types.status(),
body: binary(),
headers: Mint.Types.headers()
headers: Mint.Types.headers(),
trailers: Mint.Types.headers()
}
end
Loading
Loading