Skip to content

Commit

Permalink
fix: stream_id exhaustion on connections (#97)
Browse files Browse the repository at this point in the history
* fix: stream_id exhaustion on connections

* fix: removed now-useless reconnect? functionality

APNS/FCM config reconnect key still remains to not break API, though now
it doesn't do anything.

Also: cleaned up some compiler warnings.

* fix: connection asks for more demand on stream completion

Previous behavior was to ask for more demand as soon as events arrived,
though this leads to unreliable timeouts and such when Kadabra can't
process fast enough.

Potential issue: The requested streams don't timeout, so theoretically a
worker could hang if there was no response. Will also need to add an
error response for any :closed messages while there are outstanding
streams (though this should never happen for FCM session_timeouts)

Also fixed: FCM Poison parse failure hard crashes due to bad pattern
match. Now logs an error instead.

* fix: connections supervised again

Crashed connections will cause a memory leak with hpack/stream workers
in Kadabra. Will need to be addressed.

* chore: bump version number

* chore: update CHANGELOG
  • Loading branch information
hpopp authored Dec 4, 2017
1 parent c7d63f4 commit 8e3e8e6
Show file tree
Hide file tree
Showing 16 changed files with 297 additions and 172 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

## v1.1.2
* Auto-restart connections if max stream ID is reached
* FCM/APNS Workers now use GenStage to queue pending pushes
* Bumped minimum Kadabra version to `v0.3.5`

## v1.1.1
* Bumped minimum Kadabra version to `v0.3.4`

Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ Add pigeon and kadabra as `mix.exs` dependencies:
```elixir
def deps do
[
{:pigeon, "~> 1.1.1"},
{:kadabra, "~> 0.3.4"}
{:pigeon, "~> 1.1.2"},
{:kadabra, "~> 0.3.5"}
]
end
```
Expand Down
6 changes: 3 additions & 3 deletions docs/Getting Started.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
> HTTP2-compliant wrapper for sending iOS and Android push notifications.
[![Build Status](https://travis-ci.org/codedge-llc/pigeon.svg?branch=master)](https://travis-ci.org/codedge-llc/pigeon)
[![Coverage Status](https://coveralls.io/repos/github/codedge-llc/pigeon/badge.svg?branch=v1.1.0)](https://coveralls.io/github/codedge-llc/pigeon?branch=v1.1.0)
[![Coverage Status](https://coveralls.io/repos/github/codedge-llc/pigeon/badge.svg?branch=v1.1.0)](https://coveralls.io/github/codedge-llc/pigeon)
[![Hex.pm](http://img.shields.io/hexpm/v/pigeon.svg)](https://hex.pm/packages/pigeon) [![Hex.pm](http://img.shields.io/hexpm/dt/pigeon.svg)](https://hex.pm/packages/pigeon)
[![Deps Status](https://beta.hexfaktor.org/badge/all/github/codedge-llc/pigeon.svg)](https://beta.hexfaktor.org/github/codedge-llc/pigeon)

Expand All @@ -12,8 +12,8 @@ Add pigeon and kadabra as `mix.exs` dependencies:
```elixir
def deps do
[
{:pigeon, "~> 1.1.1"},
{:kadabra, "~> 0.3.4"}
{:pigeon, "~> 1.1.2"},
{:kadabra, "~> 0.3.5"}
]
end
```
Expand Down
7 changes: 7 additions & 0 deletions lib/pigeon.ex
Original file line number Diff line number Diff line change
Expand Up @@ -71,5 +71,12 @@ defmodule Pigeon do
end
end

@doc false
def start_connection(state) do
opts = [restart: :temporary, id: :erlang.make_ref]
spec = worker(Pigeon.Connection, [state], opts)
Supervisor.start_child(:pigeon, spec)
end

def debug_log?, do: Application.get_env(:pigeon, :debug_log, false)
end
4 changes: 2 additions & 2 deletions lib/pigeon/apns.ex
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ defmodule Pigeon.APNS do
end
defp push(notification, on_response, opts) do
worker_name = opts[:to] || Config.default_name
Worker.cast_push(worker_name, notification, on_response: on_response)
Worker.send_push(worker_name, notification, on_response: on_response)
end

@doc ~S"""
Expand Down Expand Up @@ -155,7 +155,7 @@ defmodule Pigeon.APNS do
on_response = fn(x) -> send pid, {ref, x} end

worker_name = opts[:to] || Config.default_name
Worker.cast_push(worker_name, notification, on_response: on_response)
Worker.send_push(worker_name, notification, on_response: on_response)

receive do
{^ref, x} -> x
Expand Down
6 changes: 3 additions & 3 deletions lib/pigeon/apns/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ defimpl Pigeon.Configurable, for: Pigeon.APNS.Config do
@spec worker_name(any) :: atom | nil
def worker_name(%Config{name: name}), do: name

@spec max_demand(any) :: non_neg_integer
def max_demand(_config), do: 1_000

@spec connect(any) :: {:ok, sock} | {:error, String.t}
def connect(%Config{uri: uri} = config) do
uri = to_charlist(uri)
Expand Down Expand Up @@ -240,9 +243,6 @@ defimpl Pigeon.Configurable, for: Pigeon.APNS.Config do
Process.send_after(self(), :ping, ping)
end

@spec reconnect?(any) :: boolean
def reconnect?(%Config{reconnect: reconnect}), do: reconnect

def close(_config) do
end

Expand Down
17 changes: 3 additions & 14 deletions lib/pigeon/configurable.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ defprotocol Pigeon.Configurable do

def handle_end_stream(config, stream, notification, on_response)

@spec max_demand(any) :: non_neg_integer
def max_demand(config)

@doc ~S"""
Schedules connection ping if necessary.
Expand All @@ -48,19 +51,5 @@ defprotocol Pigeon.Configurable do
@spec schedule_ping(any) :: no_return
def schedule_ping(config)

@doc ~S"""
Returns whether connection should reconnect if dropped.
## Examples
iex> reconnect?(%Pigeon.APNS.Config{reconnect: true})
true
iex> reconnect?(%Pigeon.FCM.Config{}) # always false
false
"""
@spec reconnect?(any) :: boolean
def reconnect?(config)

def close(config)
end
174 changes: 174 additions & 0 deletions lib/pigeon/connection.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
defmodule Pigeon.Connection do
@moduledoc false

defstruct config: nil,
from: nil,
socket: nil,
queue: %{},
stream_id: 1,
requested: 0,
completed: 0

use GenStage
require Logger

alias Pigeon.{Configurable, Connection}
alias Pigeon.Http2.{Client, Stream}
alias Pigeon.Worker.NotificationQueue

@limit 1_000_000_000

def handle_subscribe(:producer, _opts, from, state) do
demand = Configurable.max_demand(state.config)
GenStage.ask(from, demand)
state =
state
|> inc_requested(demand)
|> Map.put(:from, from)
{:manual, state}
end

def start_link({config, from}) do
GenStage.start_link(__MODULE__, {config, from})
end

def init(config), do: initialize_worker(config)

def initialize_worker({config, from}) do
state = %Connection{config: config, from: from}
case connect_socket(config, 0) do
{:ok, socket} ->
Configurable.schedule_ping(config)
{:consumer, %{state | socket: socket}, subscribe_to: [from]}
{:error, reason} -> {:stop, reason}
end
end

def connect_socket(_config, 3), do: {:error, :timeout}
def connect_socket(config, tries) do
case Configurable.connect(config) do
{:ok, socket} -> {:ok, socket}
{:error, _reason} -> connect_socket(config, tries + 1)
end
end

# Handle Cancels

def handle_cancel({:down, :normal}, _from, state) do
{:stop, :normal, state}
end

def handle_cancel({:down, :shutdown}, _from, state) do
{:stop, :normal, state}
end

def handle_cancel({:cancel, :closed}, _from, state) do
{:stop, :normal, state}
end

def handle_cancel({:cancel, :stream_id_exhausted}, _from, state) do
{:stop, :normal, state}
end

# Info

def handle_info(:ping, state) do
Client.default().send_ping(state.socket)
Configurable.schedule_ping(state.config)

{:noreply, [], state}
end

def handle_info({:closed, _}, %{from: from} = state) do
GenStage.cancel(from, :closed)
{:noreply, [], %{state | socket: nil}}
end

def handle_info(msg, state) do
case Client.default().handle_end_stream(msg, state) do
{:ok, %Stream{} = stream} -> process_end_stream(stream, state)
_else -> {:noreply, [], state}
end
end

def handle_events(events, _from, state) do
state =
Enum.reduce(events, state, fn({:push, notif, opts}, state) ->
send_push(state, notif, opts)
end)

{:noreply, [], state}
end

def process_end_stream(%Stream{id: stream_id} = stream,
%{queue: queue, config: config} = state) do
case NotificationQueue.pop(queue, stream_id) do
{nil, new_queue} ->
# Do nothing if no queued item for stream
{:noreply, [], %{state | queue: new_queue}}
{{notif, on_response}, new_queue} ->
Configurable.handle_end_stream(config, stream, notif, on_response)
state =
state
|> inc_completed(1)
|> dec_requested(1)
|> Map.put(:queue, new_queue)

total_requests = state.completed + state.requested
max_demand = Configurable.max_demand(state.config)
state =
if total_requests < @limit and state.requested < max_demand do
to_ask = min(@limit - total_requests, max_demand - state.requested)
GenStage.ask(state.from, to_ask)
inc_requested(state, to_ask)
else
state
end

if state.completed >= @limit do
GenStage.cancel(state.from, :stream_id_exhausted)
end
{:noreply, [], state}
end
end

def send_push(%{config: config, queue: queue} = state, notification, opts) do
headers = Configurable.push_headers(config, notification, opts)
payload = Configurable.push_payload(config, notification, opts)

Client.default().send_request(state.socket, headers, payload)

new_q = NotificationQueue.add(queue,
state.stream_id,
notification,
opts[:on_response])

state
|> inc_stream_id()
|> Map.put(:queue, new_q)
end

# Cast

def handle_cast(_msg, state) do
{:noreply, [], state}
end

# Helpers

def inc_requested(state, count) do
%{state | requested: state.requested + count}
end

def dec_requested(state, count) do
%{state | requested: state.requested - count}
end

def inc_completed(state, count) do
%{state | completed: state.completed + count}
end

def inc_stream_id(%{stream_id: stream_id} = state) do
%{state | stream_id: stream_id + 2}
end
end
2 changes: 1 addition & 1 deletion lib/pigeon/fcm.ex
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ defmodule Pigeon.FCM do

defp cast_request(worker_name, request, on_response, opts) do
opts = Keyword.put(opts, :on_response, on_response)
GenServer.cast(worker_name, {:push, request, opts})
Worker.send_push(worker_name, request, opts)
end

defp sync_push(notification, opts) do
Expand Down
16 changes: 11 additions & 5 deletions lib/pigeon/fcm/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ defimpl Pigeon.Configurable, for: Pigeon.FCM.Config do
@spec worker_name(any) :: atom | nil
def worker_name(%Config{name: name}), do: name

@spec max_demand(any) :: non_neg_integer
def max_demand(_config), do: 100

@spec connect(any) :: {:ok, sock} | {:error, String.t}
def connect(%Config{uri: uri} = config) do
case connect_socket_options(config) do
Expand Down Expand Up @@ -138,8 +141,6 @@ defimpl Pigeon.Configurable, for: Pigeon.FCM.Config do

def schedule_ping(_config), do: :ok

def reconnect?(_config), do: false

def close(_config) do
end

Expand All @@ -150,9 +151,14 @@ defimpl Pigeon.Configurable, for: Pigeon.FCM.Config do
ResultParser.parse(ids, results, on_response, notification)
end

defp parse_error(data) do
{:ok, response} = Poison.decode(data)
response["reason"] |> Macro.underscore |> String.to_existing_atom
def parse_error(data) do
case Poison.decode(data) do
{:ok, response} ->
response["reason"] |> Macro.underscore |> String.to_existing_atom
error ->
"Poison parse failed: #{inspect(error)}, body: #{inspect(data)}"
|> Logger.error
end
end

defp log_error(code, reason) do
Expand Down
Loading

0 comments on commit 8e3e8e6

Please sign in to comment.