From fab74d5f11c843814684f355a8d9d8b7b842664f Mon Sep 17 00:00:00 2001 From: Eduardo Gurgel Date: Sun, 22 Feb 2015 09:58:00 +1300 Subject: [PATCH] Use distributed gproc to handle multiple nodes --- config/config.exs | 5 +++++ lib/poxa/channel.ex | 8 ++++---- lib/poxa/presence_channel.ex | 4 ++-- lib/poxa/presence_subscription.ex | 16 ++++++++-------- lib/poxa/pusher_event.ex | 2 +- lib/poxa/subscription.ex | 4 ++-- mix.exs | 6 +++++- mix.lock | 6 +++++- test/pusher_event_test.exs | 4 ++-- test/test_helper.exs | 2 +- 10 files changed, 35 insertions(+), 22 deletions(-) diff --git a/config/config.exs b/config/config.exs index d2383e4..26c9a76 100644 --- a/config/config.exs +++ b/config/config.exs @@ -10,3 +10,8 @@ config :poxa, app_key: get_env("POXA_APP_KEY") || "app_key", app_secret: get_env("POXA_SECRET") || "secret", app_id: get_env("POXA_APP_ID") || "app_id" + +config :gproc, gproc_dist: :all + +config :renode, + nodes: [] diff --git a/lib/poxa/channel.ex b/lib/poxa/channel.ex index 93bf4b9..8a5d165 100644 --- a/lib/poxa/channel.ex +++ b/lib/poxa/channel.ex @@ -68,7 +68,7 @@ defmodule Poxa.Channel do """ @spec occupied?(binary) :: boolean def occupied?(channel) do - match = {{:p, :l, {:pusher, channel}}, :_, :_} + match = {{:p, :g, {:pusher, channel}}, :_, :_} :gproc.select_count([{match, [], [true]}]) != 0 end @@ -77,7 +77,7 @@ defmodule Poxa.Channel do """ @spec all(pid | :_) :: [binary] def all(pid \\ :_) do - match = {{:p, :l, {:pusher, :'$1'}}, pid, :_} + match = {{:p, :g, {:pusher, :'$1'}}, pid, :_} :gproc.select([{match, [], [:'$1']}]) |> Enum.uniq end @@ -86,7 +86,7 @@ defmodule Poxa.Channel do """ @spec subscribed?(binary, pid) :: boolean def subscribed?(channel, pid) do - match = {{:p, :l, {:pusher, channel}}, pid, :_} + match = {{:p, :g, {:pusher, channel}}, pid, :_} :gproc.select_count([{match, [], [true]}]) != 0 end @@ -95,7 +95,7 @@ defmodule Poxa.Channel do """ @spec subscription_count(binary) :: non_neg_integer def subscription_count(channel) do - match = {{:p, :l, {:pusher, channel}}, :_, :_} + match = {{:p, :g, {:pusher, channel}}, :_, :_} :gproc.select_count([{match, [], [true]}]) end end diff --git a/lib/poxa/presence_channel.ex b/lib/poxa/presence_channel.ex index b03a2ae..7eec515 100644 --- a/lib/poxa/presence_channel.ex +++ b/lib/poxa/presence_channel.ex @@ -6,7 +6,7 @@ defmodule Poxa.PresenceChannel do """ @spec users(binary) :: [binary | integer] def users(channel) do - match = {{:p, :l, {:pusher, channel}}, :_, :'$1'} + match = {{:p, :g, {:pusher, channel}}, :_, :'$1'} :gproc.select([{match, [], [:'$1']}]) |> Enum.uniq(fn {user_id, _} -> user_id end) |> Enum.map(fn {user_id, _} -> user_id end) @@ -17,7 +17,7 @@ defmodule Poxa.PresenceChannel do """ @spec user_count(binary) :: non_neg_integer def user_count(channel) do - match = {{:p, :l, {:pusher, channel}}, :_, :'$1'} + match = {{:p, :g, {:pusher, channel}}, :_, :'$1'} :gproc.select([{match, [], [:'$1']}]) |> Enum.uniq(fn {user_id, _} -> user_id end) |> Enum.count diff --git a/lib/poxa/presence_subscription.ex b/lib/poxa/presence_subscription.ex index 1d9ef2b..02e4a9b 100644 --- a/lib/poxa/presence_subscription.ex +++ b/lib/poxa/presence_subscription.ex @@ -31,15 +31,15 @@ defmodule Poxa.PresenceSubscription do {user_id, user_info} = extract_userid_and_userinfo(decoded_channel_data) unless user_id_already_on_presence_channel(user_id, channel) do message = PusherEvent.presence_member_added(channel, user_id, user_info) - :gproc.send({:p, :l, {:pusher, channel}}, {self, message}) + :gproc.send({:p, :g, {:pusher, channel}}, {self, message}) end - :gproc.reg({:p, :l, {:pusher, channel}}, {user_id, user_info}) + :gproc.reg({:p, :g, {:pusher, channel}}, {user_id, user_info}) end %__MODULE__{channel: channel, channel_data: channel_data(channel)} end defp channel_data(channel) do - for {_pid, {user_id, user_info}} <- :gproc.lookup_values({:p, :l, {:pusher, channel}}) do + for {_pid, {user_id, user_info}} <- :gproc.lookup_values({:p, :g, {:pusher, channel}}) do {user_id, user_info} end |> Enum.uniq(fn {user_id, _} -> user_id end) end @@ -54,7 +54,7 @@ defmodule Poxa.PresenceSubscription do defp sanitize_user_id(user_id), do: JSX.encode!(user_id) defp user_id_already_on_presence_channel(user_id, channel) do - match = {{:p, :l, {:pusher, channel}}, :_, {user_id, :_}} + match = {{:p, :g, {:pusher, channel}}, :_, {user_id, :_}} :gproc.select_count([{match, [], [true]}]) != 0 end @@ -64,7 +64,7 @@ defmodule Poxa.PresenceSubscription do """ @spec unsubscribe!(binary) :: {:ok, binary} def unsubscribe!(channel) do - case :gproc.get_value({:p, :l, {:pusher, channel}}) do + case :gproc.get_value({:p, :g, {:pusher, channel}}) do {user_id, _} -> if only_one_connection_on_user_id?(channel, user_id) do presence_member_removed(channel, user_id) @@ -82,7 +82,7 @@ defmodule Poxa.PresenceSubscription do """ @spec check_and_remove :: :ok def check_and_remove do - match = {{:p, :l, {:pusher, :'$1'}}, self, {:'$2', :_}} + match = {{:p, :g, {:pusher, :'$1'}}, self, {:'$2', :_}} channel_user_id = :gproc.select([{match, [], [[:'$1',:'$2']]}]) for [channel, user_id] <- channel_user_id, presence?(channel), only_one_connection_on_user_id?(channel, user_id) do @@ -93,11 +93,11 @@ defmodule Poxa.PresenceSubscription do defp presence_member_removed(channel, user_id) do message = PusherEvent.presence_member_removed(channel, user_id) - :gproc.send({:p, :l, {:pusher, channel}}, {self, message}) + :gproc.send({:p, :g, {:pusher, channel}}, {self, message}) end defp only_one_connection_on_user_id?(channel, user_id) do - match = {{:p, :l, {:pusher, channel}}, :_, {user_id, :_}} + match = {{:p, :g, {:pusher, channel}}, :_, {user_id, :_}} :gproc.select_count([{match, [], [true]}]) == 1 end end diff --git a/lib/poxa/pusher_event.ex b/lib/poxa/pusher_event.ex index 7fdfa38..1ca34df 100644 --- a/lib/poxa/pusher_event.ex +++ b/lib/poxa/pusher_event.ex @@ -191,7 +191,7 @@ defmodule Poxa.PusherEvent do @spec publish_event_to_channel(Poxa.PusherEvent.t, binary) :: :ok def publish_event_to_channel(event, channel) do message = build_message(event, channel) |> encode! - :gproc.send({:p, :l, {:pusher, channel}}, {self, message, event.socket_id}) + :gproc.send({:p, :g, {:pusher, channel}}, {self, message, event.socket_id}) end defp build_message(event, channel) do diff --git a/lib/poxa/subscription.ex b/lib/poxa/subscription.ex index 0c8f15a..cc641e7 100644 --- a/lib/poxa/subscription.ex +++ b/lib/poxa/subscription.ex @@ -54,7 +54,7 @@ defmodule Poxa.Subscription do Logger.info "Already subscribed #{inspect self} on channel #{channel}" else Logger.info "Registering #{inspect self} to channel #{channel}" - :gproc.reg({:p, :l, {:pusher, channel}}) + :gproc.reg({:p, :g, {:pusher, channel}}) end {:ok, channel} end @@ -69,7 +69,7 @@ defmodule Poxa.Subscription do if Channel.presence?(channel) do PresenceSubscription.unsubscribe!(channel); end - :gproc.unreg({:p, :l, {:pusher, channel}}); + :gproc.unreg({:p, :g, {:pusher, channel}}); else Logger.debug "Already subscribed" end diff --git a/mix.exs b/mix.exs index 2f98981..c441258 100644 --- a/mix.exs +++ b/mix.exs @@ -13,6 +13,8 @@ defmodule Poxa.Mixfile do [ applications: [ :logger, :crypto, :gproc, + :renode, + :locks, :cowboy ], included_applications: [ :exjsx, :uuid, :signaturex ], mod: { Poxa, [] } ] @@ -22,7 +24,9 @@ defmodule Poxa.Mixfile do [ {:cowboy, "~> 1.0.0" }, {:exjsx, "~> 3.0"}, {:signaturex, "~> 0.0.8"}, - {:gproc, "~> 0.3.0"}, + {:gproc, github: "uwiger/gproc", branch: "uw-locks_leader"}, + {:locks, github: "uwiger/locks"}, + {:renode, github: "edgurgel/renode"}, {:uuid, github: "avtobiff/erlang-uuid", tag: "v0.4.5" }, {:meck, "~> 0.8.2", only: :test}, {:pusher_client, github: "edgurgel/pusher_client", only: :test}, diff --git a/mix.lock b/mix.lock index 41699a9..2f74dce 100644 --- a/mix.lock +++ b/mix.lock @@ -1,19 +1,23 @@ %{"conform": {:hex, :conform, "0.10.5"}, "cowboy": {:hex, :cowboy, "1.0.0"}, "cowlib": {:hex, :cowlib, "1.0.0"}, + "edown": {:git, "git://github.com/esl/edown.git", "4d18dc4bec89c20c0ba8cab6884ce690ca606f4a", [ref: "HEAD"]}, "exjsx": {:hex, :exjsx, "3.1.0"}, "exrm": {:hex, :exrm, "0.14.9"}, - "gproc": {:hex, :gproc, "0.3.1"}, + "gproc": {:git, "git://github.com/uwiger/gproc.git", "b439917a8bf630b6cfde2681e2a20e1d26f5ab21", [branch: "uw-locks_leader"]}, "hackney": {:hex, :hackney, "1.0.6"}, "httpoison": {:hex, :httpoison, "0.6.1"}, "idna": {:hex, :idna, "1.0.2"}, "inch_ex": {:hex, :inch_ex, "0.2.3"}, "jsx": {:hex, :jsx, "2.4.0"}, + "locks": {:git, "git://github.com/uwiger/locks.git", "1ede3f1f5a9f0bcb758599050d7568b9b3d4659b", []}, "meck": {:hex, :meck, "0.8.2"}, + "plain_fsm": {:git, "git://github.com/uwiger/plain_fsm.git", "30a9b20c733820d74b01830b59d4bd041cf10f05", [branch: "master"]}, "poison": {:hex, :poison, "1.2.0"}, "pusher": {:git, "git://github.com/edgurgel/pusher.git", "d8648e6b1e3ca447e7c6604eaf0e49fcf668f091", []}, "pusher_client": {:git, "git://github.com/edgurgel/pusher_client.git", "5f0f0c63289a6ab00a1b84589949d0cbd28f7944", []}, "ranch": {:hex, :ranch, "1.0.0"}, + "renode": {:git, "git://github.com/edgurgel/renode.git", "3d61bbb48302f3cf595d006b0df1c9b4bf8864ce", []}, "signaturex": {:hex, :signaturex, "0.0.8"}, "ssl_verify_hostname": {:hex, :ssl_verify_hostname, "1.0.1"}, "uuid": {:git, "git://github.com/avtobiff/erlang-uuid.git", "2056ad3717fedccf3634757401f77272bccb4554", [tag: "v0.4.5"]}, diff --git a/test/pusher_event_test.exs b/test/pusher_event_test.exs index 129e0b4..da50ba5 100644 --- a/test/pusher_event_test.exs +++ b/test/pusher_event_test.exs @@ -77,7 +77,7 @@ defmodule Poxa.PusherEventTest do end test "sending message to a channel" do - expect(:gproc, :send, [{[{:p, :l, {:pusher, "channel123"}}, {self, :msg, nil}], :ok}]) + expect(:gproc, :send, [{[{:p, :g, {:pusher, "channel123"}}, {self, :msg, nil}], :ok}]) expected = %{channel: "channel123", data: "data", event: "event"} expect(JSX, :encode!, [{[expected], :msg}]) event = %PusherEvent{channels: ["channel123"], data: "data", name: "event"} @@ -91,7 +91,7 @@ defmodule Poxa.PusherEventTest do test "sending message to channels excluding a socket id" do expected = %{channel: "channel123", data: %{}, event: "event"} expect(JSX, :encode!, [{[expected], :msg}]) - expect(:gproc, :send, [{[{:p, :l, {:pusher, "channel123"}}, {self, :msg, "SocketId"}], :ok}]) + expect(:gproc, :send, [{[{:p, :g, {:pusher, "channel123"}}, {self, :msg, "SocketId"}], :ok}]) assert publish(%PusherEvent{data: %{}, channels: ["channel123"], name: "event", socket_id: "SocketId"}) == :ok diff --git a/test/test_helper.exs b/test/test_helper.exs index 0211338..a20f713 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -27,6 +27,6 @@ defmodule SpawnHelper do end def register_to_channel(channel, value \\ :undefined) do - :gproc.reg({:p, :l, {:pusher, channel}}, value) + :gproc.reg({:p, :g, {:pusher, channel}}, value) end end