Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Use distributed gproc to handle multiple nodes #26

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,8 @@ config :poxa,
app_key: get_env("POXA_APP_KEY") || "app_key",
app_secret: get_env("POXA_SECRET") || "secret",
app_id: get_env("POXA_APP_ID") || "app_id"

config :gproc, gproc_dist: :all

config :renode,
nodes: []
8 changes: 4 additions & 4 deletions lib/poxa/channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ defmodule Poxa.Channel do
"""
@spec occupied?(binary) :: boolean
def occupied?(channel) do
match = {{:p, :l, {:pusher, channel}}, :_, :_}
match = {{:p, :g, {:pusher, channel}}, :_, :_}
:gproc.select_count([{match, [], [true]}]) != 0
end

Expand All @@ -77,7 +77,7 @@ defmodule Poxa.Channel do
"""
@spec all(pid | :_) :: [binary]
def all(pid \\ :_) do
match = {{:p, :l, {:pusher, :'$1'}}, pid, :_}
match = {{:p, :g, {:pusher, :'$1'}}, pid, :_}
:gproc.select([{match, [], [:'$1']}]) |> Enum.uniq
end

Expand All @@ -86,7 +86,7 @@ defmodule Poxa.Channel do
"""
@spec subscribed?(binary, pid) :: boolean
def subscribed?(channel, pid) do
match = {{:p, :l, {:pusher, channel}}, pid, :_}
match = {{:p, :g, {:pusher, channel}}, pid, :_}
:gproc.select_count([{match, [], [true]}]) != 0
end

Expand All @@ -95,7 +95,7 @@ defmodule Poxa.Channel do
"""
@spec subscription_count(binary) :: non_neg_integer
def subscription_count(channel) do
match = {{:p, :l, {:pusher, channel}}, :_, :_}
match = {{:p, :g, {:pusher, channel}}, :_, :_}
:gproc.select_count([{match, [], [true]}])
end
end
4 changes: 2 additions & 2 deletions lib/poxa/presence_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ defmodule Poxa.PresenceChannel do
"""
@spec users(binary) :: [binary | integer]
def users(channel) do
match = {{:p, :l, {:pusher, channel}}, :_, :'$1'}
match = {{:p, :g, {:pusher, channel}}, :_, :'$1'}
:gproc.select([{match, [], [:'$1']}])
|> Enum.uniq(fn {user_id, _} -> user_id end)
|> Enum.map(fn {user_id, _} -> user_id end)
Expand All @@ -17,7 +17,7 @@ defmodule Poxa.PresenceChannel do
"""
@spec user_count(binary) :: non_neg_integer
def user_count(channel) do
match = {{:p, :l, {:pusher, channel}}, :_, :'$1'}
match = {{:p, :g, {:pusher, channel}}, :_, :'$1'}
:gproc.select([{match, [], [:'$1']}])
|> Enum.uniq(fn {user_id, _} -> user_id end)
|> Enum.count
Expand Down
16 changes: 8 additions & 8 deletions lib/poxa/presence_subscription.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ defmodule Poxa.PresenceSubscription do
{user_id, user_info} = extract_userid_and_userinfo(decoded_channel_data)
unless user_id_already_on_presence_channel(user_id, channel) do
message = PusherEvent.presence_member_added(channel, user_id, user_info)
:gproc.send({:p, :l, {:pusher, channel}}, {self, message})
:gproc.send({:p, :g, {:pusher, channel}}, {self, message})
end
:gproc.reg({:p, :l, {:pusher, channel}}, {user_id, user_info})
:gproc.reg({:p, :g, {:pusher, channel}}, {user_id, user_info})
end
%__MODULE__{channel: channel, channel_data: channel_data(channel)}
end

defp channel_data(channel) do
for {_pid, {user_id, user_info}} <- :gproc.lookup_values({:p, :l, {:pusher, channel}}) do
for {_pid, {user_id, user_info}} <- :gproc.lookup_values({:p, :g, {:pusher, channel}}) do
{user_id, user_info}
end |> Enum.uniq(fn {user_id, _} -> user_id end)
end
Expand All @@ -54,7 +54,7 @@ defmodule Poxa.PresenceSubscription do
defp sanitize_user_id(user_id), do: JSX.encode!(user_id)

defp user_id_already_on_presence_channel(user_id, channel) do
match = {{:p, :l, {:pusher, channel}}, :_, {user_id, :_}}
match = {{:p, :g, {:pusher, channel}}, :_, {user_id, :_}}
:gproc.select_count([{match, [], [true]}]) != 0
end

Expand All @@ -64,7 +64,7 @@ defmodule Poxa.PresenceSubscription do
"""
@spec unsubscribe!(binary) :: {:ok, binary}
def unsubscribe!(channel) do
case :gproc.get_value({:p, :l, {:pusher, channel}}) do
case :gproc.get_value({:p, :g, {:pusher, channel}}) do
{user_id, _} ->
if only_one_connection_on_user_id?(channel, user_id) do
presence_member_removed(channel, user_id)
Expand All @@ -82,7 +82,7 @@ defmodule Poxa.PresenceSubscription do
"""
@spec check_and_remove :: :ok
def check_and_remove do
match = {{:p, :l, {:pusher, :'$1'}}, self, {:'$2', :_}}
match = {{:p, :g, {:pusher, :'$1'}}, self, {:'$2', :_}}
channel_user_id = :gproc.select([{match, [], [[:'$1',:'$2']]}])
for [channel, user_id] <- channel_user_id,
presence?(channel), only_one_connection_on_user_id?(channel, user_id) do
Expand All @@ -93,11 +93,11 @@ defmodule Poxa.PresenceSubscription do

defp presence_member_removed(channel, user_id) do
message = PusherEvent.presence_member_removed(channel, user_id)
:gproc.send({:p, :l, {:pusher, channel}}, {self, message})
:gproc.send({:p, :g, {:pusher, channel}}, {self, message})
end

defp only_one_connection_on_user_id?(channel, user_id) do
match = {{:p, :l, {:pusher, channel}}, :_, {user_id, :_}}
match = {{:p, :g, {:pusher, channel}}, :_, {user_id, :_}}
:gproc.select_count([{match, [], [true]}]) == 1
end
end
2 changes: 1 addition & 1 deletion lib/poxa/pusher_event.ex
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ defmodule Poxa.PusherEvent do
@spec publish_event_to_channel(Poxa.PusherEvent.t, binary) :: :ok
def publish_event_to_channel(event, channel) do
message = build_message(event, channel) |> encode!
:gproc.send({:p, :l, {:pusher, channel}}, {self, message, event.socket_id})
:gproc.send({:p, :g, {:pusher, channel}}, {self, message, event.socket_id})
end

defp build_message(event, channel) do
Expand Down
4 changes: 2 additions & 2 deletions lib/poxa/subscription.ex
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ defmodule Poxa.Subscription do
Logger.info "Already subscribed #{inspect self} on channel #{channel}"
else
Logger.info "Registering #{inspect self} to channel #{channel}"
:gproc.reg({:p, :l, {:pusher, channel}})
:gproc.reg({:p, :g, {:pusher, channel}})
end
{:ok, channel}
end
Expand All @@ -69,7 +69,7 @@ defmodule Poxa.Subscription do
if Channel.presence?(channel) do
PresenceSubscription.unsubscribe!(channel);
end
:gproc.unreg({:p, :l, {:pusher, channel}});
:gproc.unreg({:p, :g, {:pusher, channel}});
else
Logger.debug "Already subscribed"
end
Expand Down
6 changes: 5 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ defmodule Poxa.Mixfile do
[ applications: [ :logger,
:crypto,
:gproc,
:renode,
:locks,
:cowboy ],
included_applications: [ :exjsx, :uuid, :signaturex ],
mod: { Poxa, [] } ]
Expand All @@ -22,7 +24,9 @@ defmodule Poxa.Mixfile do
[ {:cowboy, "~> 1.0.0" },
{:exjsx, "~> 3.0"},
{:signaturex, "~> 0.0.8"},
{:gproc, "~> 0.3.0"},
{:gproc, github: "uwiger/gproc", branch: "uw-locks_leader"},
{:locks, github: "uwiger/locks"},
{:renode, github: "edgurgel/renode"},
{:uuid, github: "avtobiff/erlang-uuid", tag: "v0.4.5" },
{:meck, "~> 0.8.2", only: :test},
{:pusher_client, github: "edgurgel/pusher_client", only: :test},
Expand Down
6 changes: 5 additions & 1 deletion mix.lock
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
%{"conform": {:hex, :conform, "0.10.5"},
"cowboy": {:hex, :cowboy, "1.0.0"},
"cowlib": {:hex, :cowlib, "1.0.0"},
"edown": {:git, "git://github.com/esl/edown.git", "4d18dc4bec89c20c0ba8cab6884ce690ca606f4a", [ref: "HEAD"]},
"exjsx": {:hex, :exjsx, "3.1.0"},
"exrm": {:hex, :exrm, "0.14.9"},
"gproc": {:hex, :gproc, "0.3.1"},
"gproc": {:git, "git://github.com/uwiger/gproc.git", "b439917a8bf630b6cfde2681e2a20e1d26f5ab21", [branch: "uw-locks_leader"]},
"hackney": {:hex, :hackney, "1.0.6"},
"httpoison": {:hex, :httpoison, "0.6.1"},
"idna": {:hex, :idna, "1.0.2"},
"inch_ex": {:hex, :inch_ex, "0.2.3"},
"jsx": {:hex, :jsx, "2.4.0"},
"locks": {:git, "git://github.com/uwiger/locks.git", "1ede3f1f5a9f0bcb758599050d7568b9b3d4659b", []},
"meck": {:hex, :meck, "0.8.2"},
"plain_fsm": {:git, "git://github.com/uwiger/plain_fsm.git", "30a9b20c733820d74b01830b59d4bd041cf10f05", [branch: "master"]},
"poison": {:hex, :poison, "1.2.0"},
"pusher": {:git, "git://github.com/edgurgel/pusher.git", "d8648e6b1e3ca447e7c6604eaf0e49fcf668f091", []},
"pusher_client": {:git, "git://github.com/edgurgel/pusher_client.git", "5f0f0c63289a6ab00a1b84589949d0cbd28f7944", []},
"ranch": {:hex, :ranch, "1.0.0"},
"renode": {:git, "git://github.com/edgurgel/renode.git", "3d61bbb48302f3cf595d006b0df1c9b4bf8864ce", []},
"signaturex": {:hex, :signaturex, "0.0.8"},
"ssl_verify_hostname": {:hex, :ssl_verify_hostname, "1.0.1"},
"uuid": {:git, "git://github.com/avtobiff/erlang-uuid.git", "2056ad3717fedccf3634757401f77272bccb4554", [tag: "v0.4.5"]},
Expand Down
4 changes: 2 additions & 2 deletions test/pusher_event_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ defmodule Poxa.PusherEventTest do
end

test "sending message to a channel" do
expect(:gproc, :send, [{[{:p, :l, {:pusher, "channel123"}}, {self, :msg, nil}], :ok}])
expect(:gproc, :send, [{[{:p, :g, {:pusher, "channel123"}}, {self, :msg, nil}], :ok}])
expected = %{channel: "channel123", data: "data", event: "event"}
expect(JSX, :encode!, [{[expected], :msg}])
event = %PusherEvent{channels: ["channel123"], data: "data", name: "event"}
Expand All @@ -91,7 +91,7 @@ defmodule Poxa.PusherEventTest do
test "sending message to channels excluding a socket id" do
expected = %{channel: "channel123", data: %{}, event: "event"}
expect(JSX, :encode!, [{[expected], :msg}])
expect(:gproc, :send, [{[{:p, :l, {:pusher, "channel123"}}, {self, :msg, "SocketId"}], :ok}])
expect(:gproc, :send, [{[{:p, :g, {:pusher, "channel123"}}, {self, :msg, "SocketId"}], :ok}])

assert publish(%PusherEvent{data: %{}, channels: ["channel123"], name: "event", socket_id: "SocketId"}) == :ok

Expand Down
2 changes: 1 addition & 1 deletion test/test_helper.exs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@ defmodule SpawnHelper do
end

def register_to_channel(channel, value \\ :undefined) do
:gproc.reg({:p, :l, {:pusher, channel}}, value)
:gproc.reg({:p, :g, {:pusher, channel}}, value)
end
end